From ab2a67f4d28536af86f8f504338f42d2b72701b7 Mon Sep 17 00:00:00 2001 From: Clifford Tawiah Date: Mon, 23 Mar 2026 14:37:30 -0400 Subject: [PATCH] feat: annotate active OTel spans with AI SDK metrics and config metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every tracker method (track_duration, track_tokens, track_success, etc.) now writes the same metrics onto the active OpenTelemetry span in addition to firing LD analytics events. This gives users correlated LLM observability data in their tracing backend without any extra code. Key additions: - `LDAIObserveConfig` dataclass — controls span annotation behaviour (`annotate_spans`, `create_span_if_none`), passed to `LDAIClient`. - `observe.py` — span annotation helpers, baggage helpers, and `LDAIBaggageSpanProcessor` for propagating AI Config metadata through OTel context (useful with auto-instrumented LLM libraries). - `LDAIClient.config_scope()` — context manager that evaluates an AI Config and scopes its metadata as OTel baggage for the duration of the block, so downstream spans inherit AI Config identity. - `opentelemetry-api` added as an optional dependency (`pip install launchdarkly-server-sdk-ai[otel]`). All OTel code is guarded behind availability checks — zero impact when the package is not installed. - Both `LDAIConfigTracker` and `AIGraphTracker` annotate spans for every metric they track: duration, tokens, TTFT, success/error, feedback, eval scores, judge responses, graph invocation, handoffs, etc. Made-with: Cursor --- packages/sdk/server-ai/pyproject.toml | 10 + packages/sdk/server-ai/src/ldai/__init__.py | 3 + .../sdk/server-ai/src/ldai/chat/__init__.py | 61 ++- packages/sdk/server-ai/src/ldai/client.py | 72 ++- packages/sdk/server-ai/src/ldai/observe.py | 409 ++++++++++++++++++ packages/sdk/server-ai/src/ldai/otel.py | 20 + packages/sdk/server-ai/src/ldai/tracker.py | 101 ++++- packages/sdk/server-ai/tests/test_observe.py | 243 +++++++++++ 8 files changed, 874 insertions(+), 45 deletions(-) create mode 100644 packages/sdk/server-ai/src/ldai/observe.py create mode 100644 packages/sdk/server-ai/src/ldai/otel.py create mode 100644 packages/sdk/server-ai/tests/test_observe.py diff --git a/packages/sdk/server-ai/pyproject.toml b/packages/sdk/server-ai/pyproject.toml index b7a868b..4a445fa 100644 --- a/packages/sdk/server-ai/pyproject.toml +++ b/packages/sdk/server-ai/pyproject.toml @@ -27,6 +27,14 @@ packages = [{ include = "ldai", from = "src" }] python = ">=3.9,<4" launchdarkly-server-sdk = ">=9.4.0" chevron = "=0.14.0" +opentelemetry-api = {version = ">=1.0.0", optional = true} + +[tool.poetry.extras] +# Install with: pip install launchdarkly-server-sdk-ai[otel] +# Enables span annotation in LDAIConfigTracker and the config_scope() context +# manager on LDAIClient. LDAIBaggageSpanProcessor additionally requires +# opentelemetry-sdk to be installed by the application. +otel = ["opentelemetry-api"] [tool.poetry.group.dev.dependencies] @@ -37,6 +45,8 @@ pytest-asyncio = ">=0.21.0" mypy = "==1.18.2" pycodestyle = "^2.12.1" isort = ">=5.13.2,<7.0.0" +opentelemetry-api = "^1.40.0" +opentelemetry-sdk = "^1.40.0" [tool.poetry.group.docs] diff --git a/packages/sdk/server-ai/src/ldai/__init__.py b/packages/sdk/server-ai/src/ldai/__init__.py index cdd7a00..24cf4d7 100644 --- a/packages/sdk/server-ai/src/ldai/__init__.py +++ b/packages/sdk/server-ai/src/ldai/__init__.py @@ -13,6 +13,7 @@ Edge, JudgeConfiguration, LDAIAgent, LDAIAgentConfig, LDAIAgentDefaults, LDMessage, ModelConfig, ProviderConfig) from ldai.providers.types import EvalScore, JudgeResponse +from ldai.observe import LDAIBaggageSpanProcessor, LDAIObserveConfig from ldai.tracker import AIGraphTracker __all__ = [ @@ -23,6 +24,8 @@ 'AIAgents', 'AIAgentGraphConfig', 'AIGraphTracker', + 'LDAIBaggageSpanProcessor', + 'LDAIObserveConfig', 'Edge', 'AICompletionConfig', 'AICompletionConfigDefault', diff --git a/packages/sdk/server-ai/src/ldai/chat/__init__.py b/packages/sdk/server-ai/src/ldai/chat/__init__.py index c826fed..4201602 100644 --- a/packages/sdk/server-ai/src/ldai/chat/__init__.py +++ b/packages/sdk/server-ai/src/ldai/chat/__init__.py @@ -6,6 +6,7 @@ from ldai import log from ldai.judge import Judge from ldai.models import AICompletionConfig, LDMessage +from ldai.observe import _span_scope, annotate_span_with_ai_config_metadata from ldai.providers.ai_provider import AIProvider from ldai.providers.types import ChatResponse, JudgeResponse from ldai.tracker import LDAIConfigTracker @@ -50,29 +51,44 @@ async def invoke(self, prompt: str) -> ChatResponse: :param prompt: The user prompt to send to the chat model :return: ChatResponse containing the model's response and metrics """ - # Convert prompt string to LDMessage with role 'user' and add to conversation history user_message: LDMessage = LDMessage(role='user', content=prompt) self._messages.append(user_message) - # Prepend config messages to conversation history for model invocation config_messages = self._ai_config.messages or [] all_messages = config_messages + self._messages - # Delegate to provider-specific implementation with tracking - response = await self._tracker.track_metrics_of( - lambda: self._provider.invoke_model(all_messages), - lambda result: result.metrics, - ) - - # Start judge evaluations as async tasks (don't await them) - if ( - self._ai_config.judge_configuration - and self._ai_config.judge_configuration.judges - and len(self._ai_config.judge_configuration.judges) > 0 - ): - response.evaluations = self._start_judge_evaluations(self._messages, response) - - # Add the response message to conversation history + observe_config = self._tracker._observe_config + create_if_none = observe_config.annotate_spans and observe_config.create_span_if_none + + # Open (or reuse) a span for the full invoke — LLM call AND judge task + # creation must happen inside this block so that asyncio.create_task() + # captures the active span in its context copy. Judge spans created + # later in those tasks will then be correctly parented to this span. + with _span_scope("ld.ai.completion", create_if_none=create_if_none): + if observe_config.annotate_spans: + annotate_span_with_ai_config_metadata( + self._ai_config.key, + self._tracker._variation_key, + self._tracker._model_name, + self._tracker._provider_name, + version=self._tracker._version, + context_key=self._tracker._context.key, + enabled=self._tracker._enabled, + ) + + response = await self._tracker.track_metrics_of( + lambda: self._provider.invoke_model(all_messages), + lambda result: result.metrics, + ) + + # Create judge tasks INSIDE the span scope so asyncio.create_task() + # snapshots the context while the completion span is still active. + if ( + self._ai_config.judge_configuration + and self._ai_config.judge_configuration.judges + ): + response.evaluations = self._start_judge_evaluations(self._messages, response) + self._messages.append(response.message) return response @@ -113,9 +129,18 @@ async def evaluate_judge(judge_config): return eval_result + observe_config = self._tracker._observe_config + create_judge_span = observe_config.annotate_spans and observe_config.create_span_if_none + + async def evaluate_judge_with_span(judge_config): + # Open the ld.ai.judge span BEFORE the judge LLM call so the + # judge's openai.chat span is nested inside it, not beside it. + with _span_scope("ld.ai.judge", create_if_none=create_judge_span): + return await evaluate_judge(judge_config) + # Create tasks for each judge evaluation tasks = [ - asyncio.create_task(evaluate_judge(judge_config)) + asyncio.create_task(evaluate_judge_with_span(judge_config)) for judge_config in judge_configs ] diff --git a/packages/sdk/server-ai/src/ldai/client.py b/packages/sdk/server-ai/src/ldai/client.py index 8289d06..38d7c62 100644 --- a/packages/sdk/server-ai/src/ldai/client.py +++ b/packages/sdk/server-ai/src/ldai/client.py @@ -1,10 +1,12 @@ -from typing import Any, Dict, List, Optional, Tuple +from contextlib import contextmanager +from typing import Any, Dict, Generator, List, Optional, Tuple import chevron from ldclient import Context from ldclient.client import LDClient from ldai import log +from ldai.observe import LDAIObserveConfig, detach_ai_config_baggage, set_ai_config_baggage from ldai.agent_graph import AgentGraphDefinition from ldai.chat import Chat from ldai.judge import Judge @@ -32,8 +34,9 @@ class LDAIClient: """The LaunchDarkly AI SDK client object.""" - def __init__(self, client: LDClient): + def __init__(self, client: LDClient, observe: Optional[LDAIObserveConfig] = None): self._client = client + self._observe_config = observe if observe is not None else LDAIObserveConfig() self._client.track( _TRACK_SDK_INFO, _INIT_TRACK_CONTEXT, @@ -91,6 +94,60 @@ def completion_config( key, context, default or AICompletionConfigDefault.disabled(), variables ) + @contextmanager + def config_scope( + self, + key: str, + context: Context, + default: Optional[AICompletionConfigDefault] = None, + variables: Optional[Dict[str, Any]] = None, + ) -> Generator[AICompletionConfig, None, None]: + """ + Context manager that evaluates an AI Config and scopes its metadata to + the OTel context for the duration of the block. + + While inside the block, any OTel span that is started (including spans + created automatically by OpenLLMetry or other auto-instrumentation) will + have the AI Config key, variation key, model, and provider stamped on it + as span attributes by LDAIBaggageSpanProcessor, if that processor is + registered. + + This solves the context propagation problem: when completion_config() is + called at one point in the code and the LLM call happens later, deep in + the call stack, the baggage propagates automatically so the two can be + correlated in LaunchDarkly. + + Example:: + + with aiclient.config_scope("my-ai-config", context) as config: + if config.enabled: + # LLM call can be anywhere inside this block, even in a + # helper function several layers down. OpenLLMetry's + # auto-instrumented span will carry ld.ai_config.key. + response = openai_client.chat.completions.create( + model=config.model.name, + messages=build_messages(config.messages, history), + ) + config.tracker.track_openai_metrics(lambda: response) + + :param key: The key of the completion configuration. + :param context: The context to evaluate the completion configuration in. + :param default: The default value of the completion configuration. + :param variables: Additional variables for the completion configuration. + :return: Generator yielding the evaluated AICompletionConfig. + """ + config = self.completion_config(key, context, default, variables) + + model_name = config.model.name if config.model else "" + provider_name = config.provider.name if config.provider else "" + variation_key = config.tracker._variation_key if config.tracker else "" + + _, token = set_ai_config_baggage(key, variation_key, model_name, provider_name) + try: + yield config + finally: + detach_ai_config_baggage(token) + def config( self, key: str, @@ -661,18 +718,21 @@ def __evaluate( custom=custom ) + ld_meta = variation.get('_ldMeta', {}) + enabled = ld_meta.get('enabled', False) + tracker = LDAIConfigTracker( self._client, - variation.get('_ldMeta', {}).get('variationKey', ''), + ld_meta.get('variationKey', ''), key, - int(variation.get('_ldMeta', {}).get('version', 1)), + int(ld_meta.get('version', 1)), model.name if model else '', provider_config.name if provider_config else '', context, + observe_config=self._observe_config, + enabled=bool(enabled), ) - enabled = variation.get('_ldMeta', {}).get('enabled', False) - judge_configuration = None if 'judgeConfiguration' in variation and isinstance(variation['judgeConfiguration'], dict): judge_config = variation['judgeConfiguration'] diff --git a/packages/sdk/server-ai/src/ldai/observe.py b/packages/sdk/server-ai/src/ldai/observe.py new file mode 100644 index 0000000..930984b --- /dev/null +++ b/packages/sdk/server-ai/src/ldai/observe.py @@ -0,0 +1,409 @@ +""" +LLM observability integration for the LaunchDarkly AI Config SDK. + +This module provides: + +1. **LDAIObserveConfig** — developer-friendly dataclass that controls how the SDK + writes LLM metrics and AI Config metadata onto OpenTelemetry spans. + Pass it to LDAIClient to opt in/out of features:: + + from ldai import LDAIClient + from ldai.observe import LDAIObserveConfig + + # defaults: annotate active spans, create an internal span when none exists + aiclient = LDAIClient(ld_client) + + # disable all span annotation (LD analytics events still fire) + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(annotate_spans=False)) + + # annotate active spans only; don't create internal spans + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(create_span_if_none=False)) + +2. **Span annotation helpers** — write LLM metrics (tokens, duration, success, + feedback) and AI Config metadata onto the currently active OTel span. + No-ops when opentelemetry-api is not installed. + +3. **LDAIBaggageSpanProcessor** — a SpanProcessor that copies LaunchDarkly AI + Config metadata from OTel baggage onto every new span. Useful when using + config_scope() with auto-instrumented LLM libraries (e.g. OpenLLMetry):: + + from opentelemetry.sdk.trace import TracerProvider + from ldai.observe import LDAIBaggageSpanProcessor + + provider = TracerProvider() + provider.add_span_processor(LDAIBaggageSpanProcessor()) + +All public symbols in this module are safe to call when opentelemetry-api is +not installed — they silently do nothing. LDAIBaggageSpanProcessor requires +opentelemetry-sdk. +""" + +from contextlib import contextmanager +from dataclasses import dataclass + +try: + from opentelemetry import baggage as _otel_baggage + from opentelemetry import context as _otel_context + from opentelemetry import trace as _otel_trace + from opentelemetry.trace import StatusCode + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + StatusCode = None # type: ignore[assignment] + +# LDAIBaggageSpanProcessor subclasses SpanProcessor from the OTel SDK when +# available. When the SDK is not installed we fall back to object so the +# class can still be imported without error. +try: + from opentelemetry.sdk.trace import SpanProcessor as _SpanProcessorBase + _SDK_AVAILABLE = True +except ImportError: + _SpanProcessorBase = object # type: ignore[assignment,misc] + _SDK_AVAILABLE = False + + +# --------------------------------------------------------------------------- +# Developer-facing configuration +# --------------------------------------------------------------------------- + +@dataclass +class LDAIObserveConfig: + """ + Controls how the LaunchDarkly AI SDK writes observability data onto spans. + + Pass an instance to :class:`ldai.LDAIClient` at construction time:: + + from ldai import LDAIClient + from ldai.observe import LDAIObserveConfig + + # All defaults — recommended for most applications + aiclient = LDAIClient(ld_client) + + # Disable span annotation; LD analytics events still fire normally + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(annotate_spans=False)) + + # Annotate existing spans only; don't create an internal span when + # no OTel span is active at call time + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(create_span_if_none=False)) + + Attributes: + annotate_spans: When True (default), the SDK writes AI Config metadata + (key, variation, model, provider) and LLM metrics (token counts, + duration, success/error, feedback) as attributes onto the active + OTel span. Set to False to disable all span annotation while + keeping LaunchDarkly analytics tracking intact. + + create_span_if_none: When True (default) and ``annotate_spans`` is also + True, the SDK creates an internal ``ld.ai.completion`` span when no + OTel span is active at the time of the LLM call. The span is + exported through whatever ``TracerProvider`` is globally registered + (e.g. the LaunchDarkly Observability plugin). Set to False if you + only want to annotate spans you create yourself. + """ + + annotate_spans: bool = True + create_span_if_none: bool = True + + +# --------------------------------------------------------------------------- +# Baggage key constants +# --------------------------------------------------------------------------- + +_BAGGAGE_CONFIG_KEY = "ld.ai_config.key" +_BAGGAGE_VARIATION_KEY = "ld.ai_config.variation_key" +_BAGGAGE_MODEL_KEY = "ld.ai_config.model" +_BAGGAGE_PROVIDER_KEY = "ld.ai_config.provider" + +_INTERNAL_SPAN_NAME = "ld.ai.completion" +_TRACER_NAME = "launchdarkly-server-sdk-ai" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _get_recording_span(): + """Return the active OTel span if it is recording, otherwise None.""" + if not _OTEL_AVAILABLE: + return None + span = _otel_trace.get_current_span() + if span is None or not span.is_recording(): + return None + return span + + +@contextmanager +def _span_scope(name: str = _INTERNAL_SPAN_NAME, create_if_none: bool = True): + """ + Context manager that ensures an active recording span for its duration. + + - If a recording span already exists it is yielded as-is (no new span). + - If no recording span exists and ``create_if_none`` is True, an internal + span is created via the global TracerProvider and made current. + - Otherwise yields None; all annotation calls inside will be no-ops. + + Requires opentelemetry-sdk when creating a new span; safe to call when + only opentelemetry-api is installed (falls back to yield None). + """ + span = _get_recording_span() + if span is not None: + yield span + elif create_if_none and _SDK_AVAILABLE and _OTEL_AVAILABLE: + tracer = _otel_trace.get_tracer(_TRACER_NAME) + with tracer.start_as_current_span(name) as new_span: + yield new_span + else: + yield None + + +# --------------------------------------------------------------------------- +# Span annotation helpers (called by LDAIConfigTracker) +# --------------------------------------------------------------------------- + +def annotate_span_with_ai_config_metadata( + config_key: str, + variation_key: str, + model_name: str, + provider_name: str, + version: int = 0, + context_key: str = "", + enabled: bool = True, +) -> None: + """ + Write AI Config identity attributes onto the currently active OTel span. + + Attributes written: + ld.ai_config.key — AI Config flag key + ld.ai_config.variation_key — evaluated variation key + ld.ai_config.version — variation version + ld.ai_config.context_key — LaunchDarkly context key + ld.ai_config.enabled — whether the AI Config is enabled (mode) + ld.ai_config.model — model name (omitted when empty) + ld.ai_config.provider — provider name (omitted when empty) + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai_config.key", config_key) + span.set_attribute("ld.ai_config.variation_key", variation_key) + if version: + span.set_attribute("ld.ai_config.version", version) + if context_key: + span.set_attribute("ld.ai_config.context_key", context_key) + span.set_attribute("ld.ai_config.enabled", enabled) + if model_name: + span.set_attribute("ld.ai_config.model", model_name) + if provider_name: + span.set_attribute("ld.ai_config.provider", provider_name) + + +def annotate_span_with_tokens(total: int, input_tokens: int, output_tokens: int) -> None: + """ + Write token usage attributes onto the currently active OTel span. + + ld.ai.metrics.tokens.total — total token count + ld.ai.metrics.tokens.input — prompt / input tokens + ld.ai.metrics.tokens.output — completion / output tokens + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + if total > 0: + span.set_attribute("ld.ai.metrics.tokens.total", total) + if input_tokens > 0: + span.set_attribute("ld.ai.metrics.tokens.input", input_tokens) + if output_tokens > 0: + span.set_attribute("ld.ai.metrics.tokens.output", output_tokens) + + +def annotate_span_with_duration(duration_ms: int) -> None: + """ + Write ``ld.ai.metrics.duration_ms`` onto the currently active OTel span. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai.metrics.duration_ms", duration_ms) + + +def annotate_span_with_ttft(ttft_ms: int) -> None: + """ + Write ``ld.ai.metrics.time_to_first_token_ms`` onto the currently active OTel span. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai.metrics.time_to_first_token_ms", ttft_ms) + + +def annotate_span_success(success: bool) -> None: + """ + Set the active span status to OK or ERROR. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + if not _OTEL_AVAILABLE: + return + span = _get_recording_span() + if span is None: + return + span.set_status(StatusCode.OK if success else StatusCode.ERROR) + + +def annotate_span_with_feedback(kind: str) -> None: + """ + Write ``ld.ai.metrics.feedback.kind`` onto the currently active OTel span. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai.metrics.feedback.kind", kind) + + +def annotate_span_with_judge_response(judge_response) -> None: + """ + Write judge evaluation results onto the currently active OTel span. + + For each eval in the response, two attributes are written using the + sanitized metric key as a namespace: + + ld.ai.judge..score — numeric score between 0 and 1 + ld.ai.judge..reasoning — reasoning text + + Plus top-level judge attributes: + + ld.ai.judge.config_key — key of the judge AI Config + ld.ai.judge.success — whether the evaluation completed successfully + ld.ai.judge.error — error message (only when evaluation failed) + + Metric keys like ``$ld:ai:judge:relevance`` are sanitized to + ``relevance`` (``$`` stripped, ``:``-separated segments, last segment used). + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + + if judge_response.judge_config_key: + span.set_attribute("ld.ai.judge.config_key", judge_response.judge_config_key) + span.set_attribute("ld.ai.judge.success", judge_response.success) + if judge_response.error: + span.set_attribute("ld.ai.judge.error", judge_response.error) + + for metric_key, eval_score in (judge_response.evals or {}).items(): + # Sanitize metric key: strip leading '$', use last ':'-separated segment + clean = metric_key.lstrip("$").split(":")[-1] if metric_key else metric_key + span.set_attribute(f"ld.ai.judge.{clean}.score", eval_score.score) + if eval_score.reasoning: + span.set_attribute(f"ld.ai.judge.{clean}.reasoning", eval_score.reasoning) + + +# --------------------------------------------------------------------------- +# Baggage helpers (used by LDAIClient.config_scope()) +# --------------------------------------------------------------------------- + +def set_ai_config_baggage( + config_key: str, + variation_key: str, + model_name: str, + provider_name: str, +): + """ + Attach AI Config metadata to the active OTel context via baggage. + + Returns ``(ctx, token)``. The token must be passed to + :func:`detach_ai_config_baggage` to clean up. Returns ``(None, None)`` + when opentelemetry-api is not installed. + """ + if not _OTEL_AVAILABLE: + return None, None + + ctx = _otel_baggage.set_baggage(_BAGGAGE_CONFIG_KEY, config_key) + ctx = _otel_baggage.set_baggage(_BAGGAGE_VARIATION_KEY, variation_key, context=ctx) + if model_name: + ctx = _otel_baggage.set_baggage(_BAGGAGE_MODEL_KEY, model_name, context=ctx) + if provider_name: + ctx = _otel_baggage.set_baggage(_BAGGAGE_PROVIDER_KEY, provider_name, context=ctx) + + token = _otel_context.attach(ctx) + return ctx, token + + +def detach_ai_config_baggage(token) -> None: + """ + Remove AI Config baggage from the OTel context. + + No-op when opentelemetry-api is not installed or token is None. + """ + if not _OTEL_AVAILABLE or token is None: + return + _otel_context.detach(token) + + +# --------------------------------------------------------------------------- +# LDAIBaggageSpanProcessor +# --------------------------------------------------------------------------- + +class LDAIBaggageSpanProcessor(_SpanProcessorBase): + """ + An OTel SpanProcessor that copies LaunchDarkly AI Config metadata from + OTel baggage onto every new span as span attributes. + + Useful when using :meth:`LDAIClient.config_scope` together with + auto-instrumented LLM libraries (e.g. OpenLLMetry), so that spans created + inside the scope automatically carry AI Config metadata. + + Baggage key -> Span attribute + ld.ai_config.key -> ld.ai_config.key + ld.ai_config.variation_key -> ld.ai_config.variation_key + ld.ai_config.model -> ld.ai_config.model + ld.ai_config.provider -> ld.ai_config.provider + + Register once at application startup:: + + from opentelemetry.sdk.trace import TracerProvider + from ldai.observe import LDAIBaggageSpanProcessor + + provider = TracerProvider() + provider.add_span_processor(LDAIBaggageSpanProcessor()) + trace.set_tracer_provider(provider) + + Requires opentelemetry-sdk (not just opentelemetry-api). + """ + + _BAGGAGE_TO_ATTRIBUTE = { + _BAGGAGE_CONFIG_KEY: "ld.ai_config.key", + _BAGGAGE_VARIATION_KEY: "ld.ai_config.variation_key", + _BAGGAGE_MODEL_KEY: "ld.ai_config.model", + _BAGGAGE_PROVIDER_KEY: "ld.ai_config.provider", + } + + def on_start(self, span, parent_context=None): + """Copy LD AI Config baggage entries onto the starting span as attributes.""" + if not _OTEL_AVAILABLE: + return + ctx = parent_context if parent_context is not None else _otel_context.get_current() + for baggage_key, attr_key in self._BAGGAGE_TO_ATTRIBUTE.items(): + value = _otel_baggage.get_baggage(baggage_key, context=ctx) + if value: + span.set_attribute(attr_key, value) + + def on_end(self, span): + pass + + def shutdown(self): + pass + + def force_flush(self, timeout_millis: int = 30000): + pass diff --git a/packages/sdk/server-ai/src/ldai/otel.py b/packages/sdk/server-ai/src/ldai/otel.py new file mode 100644 index 0000000..b9a2456 --- /dev/null +++ b/packages/sdk/server-ai/src/ldai/otel.py @@ -0,0 +1,20 @@ +""" +Backward-compatibility shim — import from ldai.observe instead. + +LDAIOtelConfig is a deprecated alias for LDAIObserveConfig. +""" + +from ldai.observe import ( # noqa: F401 + LDAIObserveConfig as LDAIOtelConfig, + LDAIBaggageSpanProcessor, + annotate_span_with_ai_config_metadata, + annotate_span_with_tokens, + annotate_span_with_duration, + annotate_span_with_ttft, + annotate_span_success, + annotate_span_with_feedback, + set_ai_config_baggage, + detach_ai_config_baggage, + _span_scope, + _get_recording_span, +) diff --git a/packages/sdk/server-ai/src/ldai/tracker.py b/packages/sdk/server-ai/src/ldai/tracker.py index d4f0912..3295920 100644 --- a/packages/sdk/server-ai/src/ldai/tracker.py +++ b/packages/sdk/server-ai/src/ldai/tracker.py @@ -5,6 +5,18 @@ from ldclient import Context, LDClient +from ldai.observe import ( + LDAIObserveConfig, + _span_scope, + annotate_span_success, + annotate_span_with_ai_config_metadata, + annotate_span_with_duration, + annotate_span_with_feedback, + annotate_span_with_judge_response, + annotate_span_with_tokens, + annotate_span_with_ttft, +) + class FeedbackKind(Enum): """ @@ -77,6 +89,8 @@ def __init__( model_name: str, provider_name: str, context: Context, + observe_config: Optional[LDAIObserveConfig] = None, + enabled: bool = True, ): """ Initialize an AI Config tracker. @@ -88,6 +102,8 @@ def __init__( :param model_name: Name of the model used. :param provider_name: Name of the provider used. :param context: Context for evaluation. + :param observe_config: OTel integration options (defaults to LDAIObserveConfig()). + :param enabled: Whether the AI Config variation is enabled (mode). """ self._ld_client = ld_client self._variation_key = variation_key @@ -96,6 +112,8 @@ def __init__( self._model_name = model_name self._provider_name = provider_name self._context = context + self._observe_config = observe_config if observe_config is not None else LDAIObserveConfig() + self._enabled = enabled self._summary = LDAIMetricSummary() def __get_track_data(self): @@ -119,6 +137,8 @@ def track_duration(self, duration: int) -> None: :param duration: Duration in milliseconds. """ self._summary._duration = duration + if self._observe_config.annotate_spans: + annotate_span_with_duration(duration) self._ld_client.track( "$ld:ai:duration:total", self._context, self.__get_track_data(), duration ) @@ -130,6 +150,8 @@ def track_time_to_first_token(self, time_to_first_token: int) -> None: :param time_to_first_token: Time to first token in milliseconds. """ self._summary._time_to_first_token = time_to_first_token + if self._observe_config.annotate_spans: + annotate_span_with_ttft(time_to_first_token) self._ld_client.track( "$ld:ai:tokens:ttf", self._context, @@ -231,7 +253,10 @@ def track_judge_response(self, judge_response: Any) -> None: from ldai.providers.types import EvalScore, JudgeResponse if isinstance(judge_response, JudgeResponse): - # Track evaluation scores with judge config key included in metadata + if self._observe_config.annotate_spans: + with _span_scope("ld.ai.judge", create_if_none=self._observe_config.create_span_if_none): + annotate_span_with_judge_response(judge_response) + if judge_response.evals: track_data = self.__get_track_data() if judge_response.judge_config_key: @@ -253,6 +278,8 @@ def track_feedback(self, feedback: Dict[str, FeedbackKind]) -> None: :param feedback: Dictionary containing feedback kind. """ self._summary._feedback = feedback + if self._observe_config.annotate_spans: + annotate_span_with_feedback(feedback["kind"].value) if feedback["kind"] == FeedbackKind.Positive: self._ld_client.track( "$ld:ai:feedback:user:positive", @@ -273,6 +300,8 @@ def track_success(self) -> None: Track a successful AI generation. """ self._summary._success = True + if self._observe_config.annotate_spans: + annotate_span_success(True) self._ld_client.track( "$ld:ai:generation:success", self._context, self.__get_track_data(), 1 ) @@ -282,45 +311,73 @@ def track_error(self) -> None: Track an unsuccessful AI generation attempt. """ self._summary._success = False + if self._observe_config.annotate_spans: + annotate_span_success(False) self._ld_client.track( "$ld:ai:generation:error", self._context, self.__get_track_data(), 1 ) def track_openai_metrics(self, func): """ - Track OpenAI-specific operations. - - This function will track the duration of the operation, the token - usage, and the success or error status. + Track an OpenAI chat completion call end-to-end. - If the provided function throws, then this method will also throw. + Wraps ``func`` (a zero-argument callable that returns an OpenAI + ``ChatCompletion`` response) and automatically records: - In the case the provided function throws, this function will record the - duration and an error. + - AI Config metadata on the active span (key, variation, model, provider) + - Token usage (prompt, completion, total) + - Wall-clock duration + - Success or error status - A failed operation will not have any token usage data. + All LD analytics events fire regardless of OTel configuration. + If no OTel span is active and ``LDAIObserveConfig.create_span_if_none`` + is True (the default), an internal ``ld.ai.completion`` span is + created and exported automatically. - :param func: Function to track. - :return: Result of the tracked function. + :param func: Zero-argument callable that performs the LLM call. + :return: The ``ChatCompletion`` result returned by ``func``. """ + if not self._observe_config.annotate_spans: + return self._run_tracked(func) + + with _span_scope(create_if_none=self._observe_config.create_span_if_none): + annotate_span_with_ai_config_metadata( + self._config_key, + self._variation_key, + self._model_name, + self._provider_name, + version=self._version, + context_key=self._context.key, + enabled=self._enabled, + ) + return self._run_tracked(func) + + def _run_tracked(self, func): + """Execute func() while tracking duration, success/error, and tokens.""" start_time = time.time() try: result = func() - end_time = time.time() - duration = int((end_time - start_time) * 1000) - self.track_duration(duration) - self.track_success() - if hasattr(result, "usage") and hasattr(result.usage, "to_dict"): - self.track_tokens(_openai_to_token_usage(result.usage.to_dict())) except Exception: - end_time = time.time() - duration = int((end_time - start_time) * 1000) - self.track_duration(duration) + self.track_duration(int((time.time() - start_time) * 1000)) self.track_error() raise - + self.track_duration(int((time.time() - start_time) * 1000)) + self.track_success() + self._track_tokens_from_usage(getattr(result, "usage", None)) return result + def _track_tokens_from_usage(self, usage) -> None: + """Extract token counts from an OpenAI usage object and track them.""" + if usage is None: + return + data: Optional[Dict] = None + if hasattr(usage, "to_dict"): + data = usage.to_dict() + elif hasattr(usage, "model_dump"): + data = usage.model_dump() + if data: + self.track_tokens(_openai_to_token_usage(data)) + def track_bedrock_converse_metrics(self, res: dict) -> dict: """ Track AWS Bedrock conversation operations. @@ -350,6 +407,8 @@ def track_tokens(self, tokens: TokenUsage) -> None: :param tokens: Token usage data from either custom, OpenAI, or Bedrock sources. """ self._summary._usage = tokens + if self._observe_config.annotate_spans: + annotate_span_with_tokens(tokens.total, tokens.input, tokens.output) if tokens.total > 0: self._ld_client.track( "$ld:ai:tokens:total", diff --git a/packages/sdk/server-ai/tests/test_observe.py b/packages/sdk/server-ai/tests/test_observe.py new file mode 100644 index 0000000..cc097f7 --- /dev/null +++ b/packages/sdk/server-ai/tests/test_observe.py @@ -0,0 +1,243 @@ +""" +Tests for OTel span annotation and baggage propagation. + +These tests use the real opentelemetry-sdk (installed as a dev dependency) +to verify that LDAIConfigTracker correctly annotates spans and that +LDAIBaggageSpanProcessor correctly copies baggage to new spans. +""" +from unittest.mock import MagicMock, patch + +import pytest +from ldclient import Config, Context, LDClient +from ldclient.integrations.test_data import TestData + +from ldai.tracker import FeedbackKind, LDAIConfigTracker, TokenUsage + +# Skip all tests in this module when opentelemetry-sdk is not installed. +pytest.importorskip("opentelemetry.sdk.trace", reason="opentelemetry-sdk not installed") + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor + +from ldai.observe import LDAIBaggageSpanProcessor, set_ai_config_baggage, detach_ai_config_baggage + + +@pytest.fixture +def td() -> TestData: + td = TestData.data_source() + td.update( + td.flag("model-config") + .variations( + { + "model": {"name": "fakeModel", "parameters": {}}, + "provider": {"name": "fakeProvider"}, + "messages": [{"role": "system", "content": "Hello!"}], + "_ldMeta": {"enabled": True, "variationKey": "abcd", "version": 1}, + }, + "green", + ) + .variation_for_all(0) + ) + return td + + +@pytest.fixture +def ld_client(td: TestData) -> LDClient: + config = Config("sdk-key", update_processor_class=td, send_events=False) + client = LDClient(config=config) + client.track = MagicMock() # type: ignore + return client + + +@pytest.fixture +def span_exporter(): + """Set up a local in-memory OTel provider and return (tracer, exporter). + + Uses a local TracerProvider rather than the global one so tests are + isolated from each other. Spans created via start_as_current_span() are + visible to trace.get_current_span() because OTel context propagation is + independent of the global provider. + """ + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("test") + yield tracer, exporter + exporter.clear() + + +@pytest.fixture +def exporter_with_baggage_processor(): + """Set up a local provider with LDAIBaggageSpanProcessor and in-memory exporter.""" + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(LDAIBaggageSpanProcessor()) + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("test") + yield tracer, exporter + exporter.clear() + + +# --------------------------------------------------------------------------- +# Tracker span annotation tests +# --------------------------------------------------------------------------- + +def test_track_tokens_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_tokens(TokenUsage(total=300, input=200, output=100)) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + attrs = spans[0].attributes + assert attrs["ld.ai.metrics.tokens.total"] == 300 + assert attrs["ld.ai.metrics.tokens.input"] == 200 + assert attrs["ld.ai.metrics.tokens.output"] == 100 + + +def test_track_duration_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_duration(250) + + spans = exporter.get_finished_spans() + assert spans[0].attributes["ld.ai.metrics.duration_ms"] == 250 + + +def test_track_ttft_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_time_to_first_token(80) + + spans = exporter.get_finished_spans() + assert spans[0].attributes["ld.ai.metrics.time_to_first_token_ms"] == 80 + + +def test_track_success_sets_span_status_ok(ld_client, span_exporter): + from opentelemetry.trace import StatusCode + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_success() + + spans = exporter.get_finished_spans() + assert spans[0].status.status_code == StatusCode.OK + + +def test_track_error_sets_span_status_error(ld_client, span_exporter): + from opentelemetry.trace import StatusCode + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_error() + + spans = exporter.get_finished_spans() + assert spans[0].status.status_code == StatusCode.ERROR + + +def test_track_feedback_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_feedback({"kind": FeedbackKind.Positive}) + + spans = exporter.get_finished_spans() + assert spans[0].attributes["ld.ai.metrics.feedback.kind"] == "positive" + + +def test_tracker_no_op_without_active_span(ld_client, span_exporter): + """Tracker methods must not raise when no OTel span is active.""" + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + # These must all succeed silently with no active span. + tracker.track_tokens(TokenUsage(total=100, input=60, output=40)) + tracker.track_duration(100) + tracker.track_time_to_first_token(50) + tracker.track_success() + tracker.track_error() + tracker.track_feedback({"kind": FeedbackKind.Negative}) + + exporter = span_exporter[1] + assert len(exporter.get_finished_spans()) == 0 + + +# --------------------------------------------------------------------------- +# LDAIBaggageSpanProcessor tests +# --------------------------------------------------------------------------- + +def test_baggage_processor_stamps_config_key_on_child_span(exporter_with_baggage_processor): + tracer, exporter = exporter_with_baggage_processor + + _, token = set_ai_config_baggage( + config_key="my-config", + variation_key="var-abc", + model_name="gpt-4o", + provider_name="openai", + ) + try: + with tracer.start_as_current_span("root-span"): + with tracer.start_as_current_span("llm-span"): + pass + finally: + detach_ai_config_baggage(token) + + spans = exporter.get_finished_spans() + llm_span = next(s for s in spans if s.name == "llm-span") + assert llm_span.attributes["ld.ai_config.key"] == "my-config" + assert llm_span.attributes["ld.ai_config.variation_key"] == "var-abc" + assert llm_span.attributes["ld.ai_config.model"] == "gpt-4o" + assert llm_span.attributes["ld.ai_config.provider"] == "openai" + + +def test_baggage_processor_does_not_stamp_spans_outside_scope(exporter_with_baggage_processor): + tracer, exporter = exporter_with_baggage_processor + + _, token = set_ai_config_baggage("my-config", "var-abc", "gpt-4o", "openai") + try: + with tracer.start_as_current_span("inside-span"): + pass + finally: + detach_ai_config_baggage(token) + + # This span starts after detach; it must not carry AI Config attributes. + with tracer.start_as_current_span("outside-span"): + pass + + spans = exporter.get_finished_spans() + outside = next(s for s in spans if s.name == "outside-span") + assert "ld.ai_config.key" not in (outside.attributes or {}) + + +def test_baggage_processor_skips_missing_model_and_provider(exporter_with_baggage_processor): + tracer, exporter = exporter_with_baggage_processor + + _, token = set_ai_config_baggage("cfg", "v1", "", "") + try: + with tracer.start_as_current_span("span"): + pass + finally: + detach_ai_config_baggage(token) + + spans = exporter.get_finished_spans() + attrs = spans[0].attributes or {} + assert attrs["ld.ai_config.key"] == "cfg" + assert "ld.ai_config.model" not in attrs + assert "ld.ai_config.provider" not in attrs