diff --git a/packages/sdk/server-ai/pyproject.toml b/packages/sdk/server-ai/pyproject.toml index b7a868b..4a445fa 100644 --- a/packages/sdk/server-ai/pyproject.toml +++ b/packages/sdk/server-ai/pyproject.toml @@ -27,6 +27,14 @@ packages = [{ include = "ldai", from = "src" }] python = ">=3.9,<4" launchdarkly-server-sdk = ">=9.4.0" chevron = "=0.14.0" +opentelemetry-api = {version = ">=1.0.0", optional = true} + +[tool.poetry.extras] +# Install with: pip install launchdarkly-server-sdk-ai[otel] +# Enables span annotation in LDAIConfigTracker and the config_scope() context +# manager on LDAIClient. LDAIBaggageSpanProcessor additionally requires +# opentelemetry-sdk to be installed by the application. +otel = ["opentelemetry-api"] [tool.poetry.group.dev.dependencies] @@ -37,6 +45,8 @@ pytest-asyncio = ">=0.21.0" mypy = "==1.18.2" pycodestyle = "^2.12.1" isort = ">=5.13.2,<7.0.0" +opentelemetry-api = "^1.40.0" +opentelemetry-sdk = "^1.40.0" [tool.poetry.group.docs] diff --git a/packages/sdk/server-ai/src/ldai/__init__.py b/packages/sdk/server-ai/src/ldai/__init__.py index cdd7a00..24cf4d7 100644 --- a/packages/sdk/server-ai/src/ldai/__init__.py +++ b/packages/sdk/server-ai/src/ldai/__init__.py @@ -13,6 +13,7 @@ Edge, JudgeConfiguration, LDAIAgent, LDAIAgentConfig, LDAIAgentDefaults, LDMessage, ModelConfig, ProviderConfig) from ldai.providers.types import EvalScore, JudgeResponse +from ldai.observe import LDAIBaggageSpanProcessor, LDAIObserveConfig from ldai.tracker import AIGraphTracker __all__ = [ @@ -23,6 +24,8 @@ 'AIAgents', 'AIAgentGraphConfig', 'AIGraphTracker', + 'LDAIBaggageSpanProcessor', + 'LDAIObserveConfig', 'Edge', 'AICompletionConfig', 'AICompletionConfigDefault', diff --git a/packages/sdk/server-ai/src/ldai/chat/__init__.py b/packages/sdk/server-ai/src/ldai/chat/__init__.py index c826fed..4201602 100644 --- a/packages/sdk/server-ai/src/ldai/chat/__init__.py +++ b/packages/sdk/server-ai/src/ldai/chat/__init__.py @@ -6,6 +6,7 @@ from ldai import log from ldai.judge import Judge from ldai.models import AICompletionConfig, LDMessage +from ldai.observe import _span_scope, annotate_span_with_ai_config_metadata from ldai.providers.ai_provider import AIProvider from ldai.providers.types import ChatResponse, JudgeResponse from ldai.tracker import LDAIConfigTracker @@ -50,29 +51,44 @@ async def invoke(self, prompt: str) -> ChatResponse: :param prompt: The user prompt to send to the chat model :return: ChatResponse containing the model's response and metrics """ - # Convert prompt string to LDMessage with role 'user' and add to conversation history user_message: LDMessage = LDMessage(role='user', content=prompt) self._messages.append(user_message) - # Prepend config messages to conversation history for model invocation config_messages = self._ai_config.messages or [] all_messages = config_messages + self._messages - # Delegate to provider-specific implementation with tracking - response = await self._tracker.track_metrics_of( - lambda: self._provider.invoke_model(all_messages), - lambda result: result.metrics, - ) - - # Start judge evaluations as async tasks (don't await them) - if ( - self._ai_config.judge_configuration - and self._ai_config.judge_configuration.judges - and len(self._ai_config.judge_configuration.judges) > 0 - ): - response.evaluations = self._start_judge_evaluations(self._messages, response) - - # Add the response message to conversation history + observe_config = self._tracker._observe_config + create_if_none = observe_config.annotate_spans and observe_config.create_span_if_none + + # Open (or reuse) a span for the full invoke — LLM call AND judge task + # creation must happen inside this block so that asyncio.create_task() + # captures the active span in its context copy. Judge spans created + # later in those tasks will then be correctly parented to this span. + with _span_scope("ld.ai.completion", create_if_none=create_if_none): + if observe_config.annotate_spans: + annotate_span_with_ai_config_metadata( + self._ai_config.key, + self._tracker._variation_key, + self._tracker._model_name, + self._tracker._provider_name, + version=self._tracker._version, + context_key=self._tracker._context.key, + enabled=self._tracker._enabled, + ) + + response = await self._tracker.track_metrics_of( + lambda: self._provider.invoke_model(all_messages), + lambda result: result.metrics, + ) + + # Create judge tasks INSIDE the span scope so asyncio.create_task() + # snapshots the context while the completion span is still active. + if ( + self._ai_config.judge_configuration + and self._ai_config.judge_configuration.judges + ): + response.evaluations = self._start_judge_evaluations(self._messages, response) + self._messages.append(response.message) return response @@ -113,9 +129,18 @@ async def evaluate_judge(judge_config): return eval_result + observe_config = self._tracker._observe_config + create_judge_span = observe_config.annotate_spans and observe_config.create_span_if_none + + async def evaluate_judge_with_span(judge_config): + # Open the ld.ai.judge span BEFORE the judge LLM call so the + # judge's openai.chat span is nested inside it, not beside it. + with _span_scope("ld.ai.judge", create_if_none=create_judge_span): + return await evaluate_judge(judge_config) + # Create tasks for each judge evaluation tasks = [ - asyncio.create_task(evaluate_judge(judge_config)) + asyncio.create_task(evaluate_judge_with_span(judge_config)) for judge_config in judge_configs ] diff --git a/packages/sdk/server-ai/src/ldai/client.py b/packages/sdk/server-ai/src/ldai/client.py index 8289d06..38d7c62 100644 --- a/packages/sdk/server-ai/src/ldai/client.py +++ b/packages/sdk/server-ai/src/ldai/client.py @@ -1,10 +1,12 @@ -from typing import Any, Dict, List, Optional, Tuple +from contextlib import contextmanager +from typing import Any, Dict, Generator, List, Optional, Tuple import chevron from ldclient import Context from ldclient.client import LDClient from ldai import log +from ldai.observe import LDAIObserveConfig, detach_ai_config_baggage, set_ai_config_baggage from ldai.agent_graph import AgentGraphDefinition from ldai.chat import Chat from ldai.judge import Judge @@ -32,8 +34,9 @@ class LDAIClient: """The LaunchDarkly AI SDK client object.""" - def __init__(self, client: LDClient): + def __init__(self, client: LDClient, observe: Optional[LDAIObserveConfig] = None): self._client = client + self._observe_config = observe if observe is not None else LDAIObserveConfig() self._client.track( _TRACK_SDK_INFO, _INIT_TRACK_CONTEXT, @@ -91,6 +94,60 @@ def completion_config( key, context, default or AICompletionConfigDefault.disabled(), variables ) + @contextmanager + def config_scope( + self, + key: str, + context: Context, + default: Optional[AICompletionConfigDefault] = None, + variables: Optional[Dict[str, Any]] = None, + ) -> Generator[AICompletionConfig, None, None]: + """ + Context manager that evaluates an AI Config and scopes its metadata to + the OTel context for the duration of the block. + + While inside the block, any OTel span that is started (including spans + created automatically by OpenLLMetry or other auto-instrumentation) will + have the AI Config key, variation key, model, and provider stamped on it + as span attributes by LDAIBaggageSpanProcessor, if that processor is + registered. + + This solves the context propagation problem: when completion_config() is + called at one point in the code and the LLM call happens later, deep in + the call stack, the baggage propagates automatically so the two can be + correlated in LaunchDarkly. + + Example:: + + with aiclient.config_scope("my-ai-config", context) as config: + if config.enabled: + # LLM call can be anywhere inside this block, even in a + # helper function several layers down. OpenLLMetry's + # auto-instrumented span will carry ld.ai_config.key. + response = openai_client.chat.completions.create( + model=config.model.name, + messages=build_messages(config.messages, history), + ) + config.tracker.track_openai_metrics(lambda: response) + + :param key: The key of the completion configuration. + :param context: The context to evaluate the completion configuration in. + :param default: The default value of the completion configuration. + :param variables: Additional variables for the completion configuration. + :return: Generator yielding the evaluated AICompletionConfig. + """ + config = self.completion_config(key, context, default, variables) + + model_name = config.model.name if config.model else "" + provider_name = config.provider.name if config.provider else "" + variation_key = config.tracker._variation_key if config.tracker else "" + + _, token = set_ai_config_baggage(key, variation_key, model_name, provider_name) + try: + yield config + finally: + detach_ai_config_baggage(token) + def config( self, key: str, @@ -661,18 +718,21 @@ def __evaluate( custom=custom ) + ld_meta = variation.get('_ldMeta', {}) + enabled = ld_meta.get('enabled', False) + tracker = LDAIConfigTracker( self._client, - variation.get('_ldMeta', {}).get('variationKey', ''), + ld_meta.get('variationKey', ''), key, - int(variation.get('_ldMeta', {}).get('version', 1)), + int(ld_meta.get('version', 1)), model.name if model else '', provider_config.name if provider_config else '', context, + observe_config=self._observe_config, + enabled=bool(enabled), ) - enabled = variation.get('_ldMeta', {}).get('enabled', False) - judge_configuration = None if 'judgeConfiguration' in variation and isinstance(variation['judgeConfiguration'], dict): judge_config = variation['judgeConfiguration'] diff --git a/packages/sdk/server-ai/src/ldai/observe.py b/packages/sdk/server-ai/src/ldai/observe.py new file mode 100644 index 0000000..930984b --- /dev/null +++ b/packages/sdk/server-ai/src/ldai/observe.py @@ -0,0 +1,409 @@ +""" +LLM observability integration for the LaunchDarkly AI Config SDK. + +This module provides: + +1. **LDAIObserveConfig** — developer-friendly dataclass that controls how the SDK + writes LLM metrics and AI Config metadata onto OpenTelemetry spans. + Pass it to LDAIClient to opt in/out of features:: + + from ldai import LDAIClient + from ldai.observe import LDAIObserveConfig + + # defaults: annotate active spans, create an internal span when none exists + aiclient = LDAIClient(ld_client) + + # disable all span annotation (LD analytics events still fire) + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(annotate_spans=False)) + + # annotate active spans only; don't create internal spans + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(create_span_if_none=False)) + +2. **Span annotation helpers** — write LLM metrics (tokens, duration, success, + feedback) and AI Config metadata onto the currently active OTel span. + No-ops when opentelemetry-api is not installed. + +3. **LDAIBaggageSpanProcessor** — a SpanProcessor that copies LaunchDarkly AI + Config metadata from OTel baggage onto every new span. Useful when using + config_scope() with auto-instrumented LLM libraries (e.g. OpenLLMetry):: + + from opentelemetry.sdk.trace import TracerProvider + from ldai.observe import LDAIBaggageSpanProcessor + + provider = TracerProvider() + provider.add_span_processor(LDAIBaggageSpanProcessor()) + +All public symbols in this module are safe to call when opentelemetry-api is +not installed — they silently do nothing. LDAIBaggageSpanProcessor requires +opentelemetry-sdk. +""" + +from contextlib import contextmanager +from dataclasses import dataclass + +try: + from opentelemetry import baggage as _otel_baggage + from opentelemetry import context as _otel_context + from opentelemetry import trace as _otel_trace + from opentelemetry.trace import StatusCode + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + StatusCode = None # type: ignore[assignment] + +# LDAIBaggageSpanProcessor subclasses SpanProcessor from the OTel SDK when +# available. When the SDK is not installed we fall back to object so the +# class can still be imported without error. +try: + from opentelemetry.sdk.trace import SpanProcessor as _SpanProcessorBase + _SDK_AVAILABLE = True +except ImportError: + _SpanProcessorBase = object # type: ignore[assignment,misc] + _SDK_AVAILABLE = False + + +# --------------------------------------------------------------------------- +# Developer-facing configuration +# --------------------------------------------------------------------------- + +@dataclass +class LDAIObserveConfig: + """ + Controls how the LaunchDarkly AI SDK writes observability data onto spans. + + Pass an instance to :class:`ldai.LDAIClient` at construction time:: + + from ldai import LDAIClient + from ldai.observe import LDAIObserveConfig + + # All defaults — recommended for most applications + aiclient = LDAIClient(ld_client) + + # Disable span annotation; LD analytics events still fire normally + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(annotate_spans=False)) + + # Annotate existing spans only; don't create an internal span when + # no OTel span is active at call time + aiclient = LDAIClient(ld_client, observe=LDAIObserveConfig(create_span_if_none=False)) + + Attributes: + annotate_spans: When True (default), the SDK writes AI Config metadata + (key, variation, model, provider) and LLM metrics (token counts, + duration, success/error, feedback) as attributes onto the active + OTel span. Set to False to disable all span annotation while + keeping LaunchDarkly analytics tracking intact. + + create_span_if_none: When True (default) and ``annotate_spans`` is also + True, the SDK creates an internal ``ld.ai.completion`` span when no + OTel span is active at the time of the LLM call. The span is + exported through whatever ``TracerProvider`` is globally registered + (e.g. the LaunchDarkly Observability plugin). Set to False if you + only want to annotate spans you create yourself. + """ + + annotate_spans: bool = True + create_span_if_none: bool = True + + +# --------------------------------------------------------------------------- +# Baggage key constants +# --------------------------------------------------------------------------- + +_BAGGAGE_CONFIG_KEY = "ld.ai_config.key" +_BAGGAGE_VARIATION_KEY = "ld.ai_config.variation_key" +_BAGGAGE_MODEL_KEY = "ld.ai_config.model" +_BAGGAGE_PROVIDER_KEY = "ld.ai_config.provider" + +_INTERNAL_SPAN_NAME = "ld.ai.completion" +_TRACER_NAME = "launchdarkly-server-sdk-ai" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _get_recording_span(): + """Return the active OTel span if it is recording, otherwise None.""" + if not _OTEL_AVAILABLE: + return None + span = _otel_trace.get_current_span() + if span is None or not span.is_recording(): + return None + return span + + +@contextmanager +def _span_scope(name: str = _INTERNAL_SPAN_NAME, create_if_none: bool = True): + """ + Context manager that ensures an active recording span for its duration. + + - If a recording span already exists it is yielded as-is (no new span). + - If no recording span exists and ``create_if_none`` is True, an internal + span is created via the global TracerProvider and made current. + - Otherwise yields None; all annotation calls inside will be no-ops. + + Requires opentelemetry-sdk when creating a new span; safe to call when + only opentelemetry-api is installed (falls back to yield None). + """ + span = _get_recording_span() + if span is not None: + yield span + elif create_if_none and _SDK_AVAILABLE and _OTEL_AVAILABLE: + tracer = _otel_trace.get_tracer(_TRACER_NAME) + with tracer.start_as_current_span(name) as new_span: + yield new_span + else: + yield None + + +# --------------------------------------------------------------------------- +# Span annotation helpers (called by LDAIConfigTracker) +# --------------------------------------------------------------------------- + +def annotate_span_with_ai_config_metadata( + config_key: str, + variation_key: str, + model_name: str, + provider_name: str, + version: int = 0, + context_key: str = "", + enabled: bool = True, +) -> None: + """ + Write AI Config identity attributes onto the currently active OTel span. + + Attributes written: + ld.ai_config.key — AI Config flag key + ld.ai_config.variation_key — evaluated variation key + ld.ai_config.version — variation version + ld.ai_config.context_key — LaunchDarkly context key + ld.ai_config.enabled — whether the AI Config is enabled (mode) + ld.ai_config.model — model name (omitted when empty) + ld.ai_config.provider — provider name (omitted when empty) + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai_config.key", config_key) + span.set_attribute("ld.ai_config.variation_key", variation_key) + if version: + span.set_attribute("ld.ai_config.version", version) + if context_key: + span.set_attribute("ld.ai_config.context_key", context_key) + span.set_attribute("ld.ai_config.enabled", enabled) + if model_name: + span.set_attribute("ld.ai_config.model", model_name) + if provider_name: + span.set_attribute("ld.ai_config.provider", provider_name) + + +def annotate_span_with_tokens(total: int, input_tokens: int, output_tokens: int) -> None: + """ + Write token usage attributes onto the currently active OTel span. + + ld.ai.metrics.tokens.total — total token count + ld.ai.metrics.tokens.input — prompt / input tokens + ld.ai.metrics.tokens.output — completion / output tokens + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + if total > 0: + span.set_attribute("ld.ai.metrics.tokens.total", total) + if input_tokens > 0: + span.set_attribute("ld.ai.metrics.tokens.input", input_tokens) + if output_tokens > 0: + span.set_attribute("ld.ai.metrics.tokens.output", output_tokens) + + +def annotate_span_with_duration(duration_ms: int) -> None: + """ + Write ``ld.ai.metrics.duration_ms`` onto the currently active OTel span. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai.metrics.duration_ms", duration_ms) + + +def annotate_span_with_ttft(ttft_ms: int) -> None: + """ + Write ``ld.ai.metrics.time_to_first_token_ms`` onto the currently active OTel span. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai.metrics.time_to_first_token_ms", ttft_ms) + + +def annotate_span_success(success: bool) -> None: + """ + Set the active span status to OK or ERROR. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + if not _OTEL_AVAILABLE: + return + span = _get_recording_span() + if span is None: + return + span.set_status(StatusCode.OK if success else StatusCode.ERROR) + + +def annotate_span_with_feedback(kind: str) -> None: + """ + Write ``ld.ai.metrics.feedback.kind`` onto the currently active OTel span. + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + span.set_attribute("ld.ai.metrics.feedback.kind", kind) + + +def annotate_span_with_judge_response(judge_response) -> None: + """ + Write judge evaluation results onto the currently active OTel span. + + For each eval in the response, two attributes are written using the + sanitized metric key as a namespace: + + ld.ai.judge..score — numeric score between 0 and 1 + ld.ai.judge..reasoning — reasoning text + + Plus top-level judge attributes: + + ld.ai.judge.config_key — key of the judge AI Config + ld.ai.judge.success — whether the evaluation completed successfully + ld.ai.judge.error — error message (only when evaluation failed) + + Metric keys like ``$ld:ai:judge:relevance`` are sanitized to + ``relevance`` (``$`` stripped, ``:``-separated segments, last segment used). + + No-op when opentelemetry-api is not installed or no recording span is active. + """ + span = _get_recording_span() + if span is None: + return + + if judge_response.judge_config_key: + span.set_attribute("ld.ai.judge.config_key", judge_response.judge_config_key) + span.set_attribute("ld.ai.judge.success", judge_response.success) + if judge_response.error: + span.set_attribute("ld.ai.judge.error", judge_response.error) + + for metric_key, eval_score in (judge_response.evals or {}).items(): + # Sanitize metric key: strip leading '$', use last ':'-separated segment + clean = metric_key.lstrip("$").split(":")[-1] if metric_key else metric_key + span.set_attribute(f"ld.ai.judge.{clean}.score", eval_score.score) + if eval_score.reasoning: + span.set_attribute(f"ld.ai.judge.{clean}.reasoning", eval_score.reasoning) + + +# --------------------------------------------------------------------------- +# Baggage helpers (used by LDAIClient.config_scope()) +# --------------------------------------------------------------------------- + +def set_ai_config_baggage( + config_key: str, + variation_key: str, + model_name: str, + provider_name: str, +): + """ + Attach AI Config metadata to the active OTel context via baggage. + + Returns ``(ctx, token)``. The token must be passed to + :func:`detach_ai_config_baggage` to clean up. Returns ``(None, None)`` + when opentelemetry-api is not installed. + """ + if not _OTEL_AVAILABLE: + return None, None + + ctx = _otel_baggage.set_baggage(_BAGGAGE_CONFIG_KEY, config_key) + ctx = _otel_baggage.set_baggage(_BAGGAGE_VARIATION_KEY, variation_key, context=ctx) + if model_name: + ctx = _otel_baggage.set_baggage(_BAGGAGE_MODEL_KEY, model_name, context=ctx) + if provider_name: + ctx = _otel_baggage.set_baggage(_BAGGAGE_PROVIDER_KEY, provider_name, context=ctx) + + token = _otel_context.attach(ctx) + return ctx, token + + +def detach_ai_config_baggage(token) -> None: + """ + Remove AI Config baggage from the OTel context. + + No-op when opentelemetry-api is not installed or token is None. + """ + if not _OTEL_AVAILABLE or token is None: + return + _otel_context.detach(token) + + +# --------------------------------------------------------------------------- +# LDAIBaggageSpanProcessor +# --------------------------------------------------------------------------- + +class LDAIBaggageSpanProcessor(_SpanProcessorBase): + """ + An OTel SpanProcessor that copies LaunchDarkly AI Config metadata from + OTel baggage onto every new span as span attributes. + + Useful when using :meth:`LDAIClient.config_scope` together with + auto-instrumented LLM libraries (e.g. OpenLLMetry), so that spans created + inside the scope automatically carry AI Config metadata. + + Baggage key -> Span attribute + ld.ai_config.key -> ld.ai_config.key + ld.ai_config.variation_key -> ld.ai_config.variation_key + ld.ai_config.model -> ld.ai_config.model + ld.ai_config.provider -> ld.ai_config.provider + + Register once at application startup:: + + from opentelemetry.sdk.trace import TracerProvider + from ldai.observe import LDAIBaggageSpanProcessor + + provider = TracerProvider() + provider.add_span_processor(LDAIBaggageSpanProcessor()) + trace.set_tracer_provider(provider) + + Requires opentelemetry-sdk (not just opentelemetry-api). + """ + + _BAGGAGE_TO_ATTRIBUTE = { + _BAGGAGE_CONFIG_KEY: "ld.ai_config.key", + _BAGGAGE_VARIATION_KEY: "ld.ai_config.variation_key", + _BAGGAGE_MODEL_KEY: "ld.ai_config.model", + _BAGGAGE_PROVIDER_KEY: "ld.ai_config.provider", + } + + def on_start(self, span, parent_context=None): + """Copy LD AI Config baggage entries onto the starting span as attributes.""" + if not _OTEL_AVAILABLE: + return + ctx = parent_context if parent_context is not None else _otel_context.get_current() + for baggage_key, attr_key in self._BAGGAGE_TO_ATTRIBUTE.items(): + value = _otel_baggage.get_baggage(baggage_key, context=ctx) + if value: + span.set_attribute(attr_key, value) + + def on_end(self, span): + pass + + def shutdown(self): + pass + + def force_flush(self, timeout_millis: int = 30000): + pass diff --git a/packages/sdk/server-ai/src/ldai/otel.py b/packages/sdk/server-ai/src/ldai/otel.py new file mode 100644 index 0000000..b9a2456 --- /dev/null +++ b/packages/sdk/server-ai/src/ldai/otel.py @@ -0,0 +1,20 @@ +""" +Backward-compatibility shim — import from ldai.observe instead. + +LDAIOtelConfig is a deprecated alias for LDAIObserveConfig. +""" + +from ldai.observe import ( # noqa: F401 + LDAIObserveConfig as LDAIOtelConfig, + LDAIBaggageSpanProcessor, + annotate_span_with_ai_config_metadata, + annotate_span_with_tokens, + annotate_span_with_duration, + annotate_span_with_ttft, + annotate_span_success, + annotate_span_with_feedback, + set_ai_config_baggage, + detach_ai_config_baggage, + _span_scope, + _get_recording_span, +) diff --git a/packages/sdk/server-ai/src/ldai/tracker.py b/packages/sdk/server-ai/src/ldai/tracker.py index d4f0912..3295920 100644 --- a/packages/sdk/server-ai/src/ldai/tracker.py +++ b/packages/sdk/server-ai/src/ldai/tracker.py @@ -5,6 +5,18 @@ from ldclient import Context, LDClient +from ldai.observe import ( + LDAIObserveConfig, + _span_scope, + annotate_span_success, + annotate_span_with_ai_config_metadata, + annotate_span_with_duration, + annotate_span_with_feedback, + annotate_span_with_judge_response, + annotate_span_with_tokens, + annotate_span_with_ttft, +) + class FeedbackKind(Enum): """ @@ -77,6 +89,8 @@ def __init__( model_name: str, provider_name: str, context: Context, + observe_config: Optional[LDAIObserveConfig] = None, + enabled: bool = True, ): """ Initialize an AI Config tracker. @@ -88,6 +102,8 @@ def __init__( :param model_name: Name of the model used. :param provider_name: Name of the provider used. :param context: Context for evaluation. + :param observe_config: OTel integration options (defaults to LDAIObserveConfig()). + :param enabled: Whether the AI Config variation is enabled (mode). """ self._ld_client = ld_client self._variation_key = variation_key @@ -96,6 +112,8 @@ def __init__( self._model_name = model_name self._provider_name = provider_name self._context = context + self._observe_config = observe_config if observe_config is not None else LDAIObserveConfig() + self._enabled = enabled self._summary = LDAIMetricSummary() def __get_track_data(self): @@ -119,6 +137,8 @@ def track_duration(self, duration: int) -> None: :param duration: Duration in milliseconds. """ self._summary._duration = duration + if self._observe_config.annotate_spans: + annotate_span_with_duration(duration) self._ld_client.track( "$ld:ai:duration:total", self._context, self.__get_track_data(), duration ) @@ -130,6 +150,8 @@ def track_time_to_first_token(self, time_to_first_token: int) -> None: :param time_to_first_token: Time to first token in milliseconds. """ self._summary._time_to_first_token = time_to_first_token + if self._observe_config.annotate_spans: + annotate_span_with_ttft(time_to_first_token) self._ld_client.track( "$ld:ai:tokens:ttf", self._context, @@ -231,7 +253,10 @@ def track_judge_response(self, judge_response: Any) -> None: from ldai.providers.types import EvalScore, JudgeResponse if isinstance(judge_response, JudgeResponse): - # Track evaluation scores with judge config key included in metadata + if self._observe_config.annotate_spans: + with _span_scope("ld.ai.judge", create_if_none=self._observe_config.create_span_if_none): + annotate_span_with_judge_response(judge_response) + if judge_response.evals: track_data = self.__get_track_data() if judge_response.judge_config_key: @@ -253,6 +278,8 @@ def track_feedback(self, feedback: Dict[str, FeedbackKind]) -> None: :param feedback: Dictionary containing feedback kind. """ self._summary._feedback = feedback + if self._observe_config.annotate_spans: + annotate_span_with_feedback(feedback["kind"].value) if feedback["kind"] == FeedbackKind.Positive: self._ld_client.track( "$ld:ai:feedback:user:positive", @@ -273,6 +300,8 @@ def track_success(self) -> None: Track a successful AI generation. """ self._summary._success = True + if self._observe_config.annotate_spans: + annotate_span_success(True) self._ld_client.track( "$ld:ai:generation:success", self._context, self.__get_track_data(), 1 ) @@ -282,45 +311,73 @@ def track_error(self) -> None: Track an unsuccessful AI generation attempt. """ self._summary._success = False + if self._observe_config.annotate_spans: + annotate_span_success(False) self._ld_client.track( "$ld:ai:generation:error", self._context, self.__get_track_data(), 1 ) def track_openai_metrics(self, func): """ - Track OpenAI-specific operations. - - This function will track the duration of the operation, the token - usage, and the success or error status. + Track an OpenAI chat completion call end-to-end. - If the provided function throws, then this method will also throw. + Wraps ``func`` (a zero-argument callable that returns an OpenAI + ``ChatCompletion`` response) and automatically records: - In the case the provided function throws, this function will record the - duration and an error. + - AI Config metadata on the active span (key, variation, model, provider) + - Token usage (prompt, completion, total) + - Wall-clock duration + - Success or error status - A failed operation will not have any token usage data. + All LD analytics events fire regardless of OTel configuration. + If no OTel span is active and ``LDAIObserveConfig.create_span_if_none`` + is True (the default), an internal ``ld.ai.completion`` span is + created and exported automatically. - :param func: Function to track. - :return: Result of the tracked function. + :param func: Zero-argument callable that performs the LLM call. + :return: The ``ChatCompletion`` result returned by ``func``. """ + if not self._observe_config.annotate_spans: + return self._run_tracked(func) + + with _span_scope(create_if_none=self._observe_config.create_span_if_none): + annotate_span_with_ai_config_metadata( + self._config_key, + self._variation_key, + self._model_name, + self._provider_name, + version=self._version, + context_key=self._context.key, + enabled=self._enabled, + ) + return self._run_tracked(func) + + def _run_tracked(self, func): + """Execute func() while tracking duration, success/error, and tokens.""" start_time = time.time() try: result = func() - end_time = time.time() - duration = int((end_time - start_time) * 1000) - self.track_duration(duration) - self.track_success() - if hasattr(result, "usage") and hasattr(result.usage, "to_dict"): - self.track_tokens(_openai_to_token_usage(result.usage.to_dict())) except Exception: - end_time = time.time() - duration = int((end_time - start_time) * 1000) - self.track_duration(duration) + self.track_duration(int((time.time() - start_time) * 1000)) self.track_error() raise - + self.track_duration(int((time.time() - start_time) * 1000)) + self.track_success() + self._track_tokens_from_usage(getattr(result, "usage", None)) return result + def _track_tokens_from_usage(self, usage) -> None: + """Extract token counts from an OpenAI usage object and track them.""" + if usage is None: + return + data: Optional[Dict] = None + if hasattr(usage, "to_dict"): + data = usage.to_dict() + elif hasattr(usage, "model_dump"): + data = usage.model_dump() + if data: + self.track_tokens(_openai_to_token_usage(data)) + def track_bedrock_converse_metrics(self, res: dict) -> dict: """ Track AWS Bedrock conversation operations. @@ -350,6 +407,8 @@ def track_tokens(self, tokens: TokenUsage) -> None: :param tokens: Token usage data from either custom, OpenAI, or Bedrock sources. """ self._summary._usage = tokens + if self._observe_config.annotate_spans: + annotate_span_with_tokens(tokens.total, tokens.input, tokens.output) if tokens.total > 0: self._ld_client.track( "$ld:ai:tokens:total", diff --git a/packages/sdk/server-ai/tests/test_observe.py b/packages/sdk/server-ai/tests/test_observe.py new file mode 100644 index 0000000..cc097f7 --- /dev/null +++ b/packages/sdk/server-ai/tests/test_observe.py @@ -0,0 +1,243 @@ +""" +Tests for OTel span annotation and baggage propagation. + +These tests use the real opentelemetry-sdk (installed as a dev dependency) +to verify that LDAIConfigTracker correctly annotates spans and that +LDAIBaggageSpanProcessor correctly copies baggage to new spans. +""" +from unittest.mock import MagicMock, patch + +import pytest +from ldclient import Config, Context, LDClient +from ldclient.integrations.test_data import TestData + +from ldai.tracker import FeedbackKind, LDAIConfigTracker, TokenUsage + +# Skip all tests in this module when opentelemetry-sdk is not installed. +pytest.importorskip("opentelemetry.sdk.trace", reason="opentelemetry-sdk not installed") + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor + +from ldai.observe import LDAIBaggageSpanProcessor, set_ai_config_baggage, detach_ai_config_baggage + + +@pytest.fixture +def td() -> TestData: + td = TestData.data_source() + td.update( + td.flag("model-config") + .variations( + { + "model": {"name": "fakeModel", "parameters": {}}, + "provider": {"name": "fakeProvider"}, + "messages": [{"role": "system", "content": "Hello!"}], + "_ldMeta": {"enabled": True, "variationKey": "abcd", "version": 1}, + }, + "green", + ) + .variation_for_all(0) + ) + return td + + +@pytest.fixture +def ld_client(td: TestData) -> LDClient: + config = Config("sdk-key", update_processor_class=td, send_events=False) + client = LDClient(config=config) + client.track = MagicMock() # type: ignore + return client + + +@pytest.fixture +def span_exporter(): + """Set up a local in-memory OTel provider and return (tracer, exporter). + + Uses a local TracerProvider rather than the global one so tests are + isolated from each other. Spans created via start_as_current_span() are + visible to trace.get_current_span() because OTel context propagation is + independent of the global provider. + """ + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("test") + yield tracer, exporter + exporter.clear() + + +@pytest.fixture +def exporter_with_baggage_processor(): + """Set up a local provider with LDAIBaggageSpanProcessor and in-memory exporter.""" + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(LDAIBaggageSpanProcessor()) + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("test") + yield tracer, exporter + exporter.clear() + + +# --------------------------------------------------------------------------- +# Tracker span annotation tests +# --------------------------------------------------------------------------- + +def test_track_tokens_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_tokens(TokenUsage(total=300, input=200, output=100)) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + attrs = spans[0].attributes + assert attrs["ld.ai.metrics.tokens.total"] == 300 + assert attrs["ld.ai.metrics.tokens.input"] == 200 + assert attrs["ld.ai.metrics.tokens.output"] == 100 + + +def test_track_duration_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_duration(250) + + spans = exporter.get_finished_spans() + assert spans[0].attributes["ld.ai.metrics.duration_ms"] == 250 + + +def test_track_ttft_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_time_to_first_token(80) + + spans = exporter.get_finished_spans() + assert spans[0].attributes["ld.ai.metrics.time_to_first_token_ms"] == 80 + + +def test_track_success_sets_span_status_ok(ld_client, span_exporter): + from opentelemetry.trace import StatusCode + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_success() + + spans = exporter.get_finished_spans() + assert spans[0].status.status_code == StatusCode.OK + + +def test_track_error_sets_span_status_error(ld_client, span_exporter): + from opentelemetry.trace import StatusCode + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_error() + + spans = exporter.get_finished_spans() + assert spans[0].status.status_code == StatusCode.ERROR + + +def test_track_feedback_annotates_active_span(ld_client, span_exporter): + tracer, exporter = span_exporter + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + with tracer.start_as_current_span("test-span"): + tracker.track_feedback({"kind": FeedbackKind.Positive}) + + spans = exporter.get_finished_spans() + assert spans[0].attributes["ld.ai.metrics.feedback.kind"] == "positive" + + +def test_tracker_no_op_without_active_span(ld_client, span_exporter): + """Tracker methods must not raise when no OTel span is active.""" + context = Context.create("user-key") + tracker = LDAIConfigTracker(ld_client, "var-key", "config-key", 1, "fakeModel", "fakeProvider", context) + + # These must all succeed silently with no active span. + tracker.track_tokens(TokenUsage(total=100, input=60, output=40)) + tracker.track_duration(100) + tracker.track_time_to_first_token(50) + tracker.track_success() + tracker.track_error() + tracker.track_feedback({"kind": FeedbackKind.Negative}) + + exporter = span_exporter[1] + assert len(exporter.get_finished_spans()) == 0 + + +# --------------------------------------------------------------------------- +# LDAIBaggageSpanProcessor tests +# --------------------------------------------------------------------------- + +def test_baggage_processor_stamps_config_key_on_child_span(exporter_with_baggage_processor): + tracer, exporter = exporter_with_baggage_processor + + _, token = set_ai_config_baggage( + config_key="my-config", + variation_key="var-abc", + model_name="gpt-4o", + provider_name="openai", + ) + try: + with tracer.start_as_current_span("root-span"): + with tracer.start_as_current_span("llm-span"): + pass + finally: + detach_ai_config_baggage(token) + + spans = exporter.get_finished_spans() + llm_span = next(s for s in spans if s.name == "llm-span") + assert llm_span.attributes["ld.ai_config.key"] == "my-config" + assert llm_span.attributes["ld.ai_config.variation_key"] == "var-abc" + assert llm_span.attributes["ld.ai_config.model"] == "gpt-4o" + assert llm_span.attributes["ld.ai_config.provider"] == "openai" + + +def test_baggage_processor_does_not_stamp_spans_outside_scope(exporter_with_baggage_processor): + tracer, exporter = exporter_with_baggage_processor + + _, token = set_ai_config_baggage("my-config", "var-abc", "gpt-4o", "openai") + try: + with tracer.start_as_current_span("inside-span"): + pass + finally: + detach_ai_config_baggage(token) + + # This span starts after detach; it must not carry AI Config attributes. + with tracer.start_as_current_span("outside-span"): + pass + + spans = exporter.get_finished_spans() + outside = next(s for s in spans if s.name == "outside-span") + assert "ld.ai_config.key" not in (outside.attributes or {}) + + +def test_baggage_processor_skips_missing_model_and_provider(exporter_with_baggage_processor): + tracer, exporter = exporter_with_baggage_processor + + _, token = set_ai_config_baggage("cfg", "v1", "", "") + try: + with tracer.start_as_current_span("span"): + pass + finally: + detach_ai_config_baggage(token) + + spans = exporter.get_finished_spans() + attrs = spans[0].attributes or {} + assert attrs["ld.ai_config.key"] == "cfg" + assert "ld.ai_config.model" not in attrs + assert "ld.ai_config.provider" not in attrs