Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/sdk/server-ai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ packages = [{ include = "ldai", from = "src" }]
python = ">=3.9,<4"
launchdarkly-server-sdk = ">=9.4.0"
chevron = "=0.14.0"
opentelemetry-api = {version = ">=1.0.0", optional = true}

[tool.poetry.extras]
# Install with: pip install launchdarkly-server-sdk-ai[otel]
# Enables span annotation in LDAIConfigTracker and the config_scope() context
# manager on LDAIClient. LDAIBaggageSpanProcessor additionally requires
# opentelemetry-sdk to be installed by the application.
otel = ["opentelemetry-api"]


[tool.poetry.group.dev.dependencies]
Expand All @@ -37,6 +45,8 @@ pytest-asyncio = ">=0.21.0"
mypy = "==1.18.2"
pycodestyle = "^2.12.1"
isort = ">=5.13.2,<7.0.0"
opentelemetry-api = "^1.40.0"
opentelemetry-sdk = "^1.40.0"


[tool.poetry.group.docs]
Expand Down
3 changes: 3 additions & 0 deletions packages/sdk/server-ai/src/ldai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Edge, JudgeConfiguration, LDAIAgent, LDAIAgentConfig, LDAIAgentDefaults,
LDMessage, ModelConfig, ProviderConfig)
from ldai.providers.types import EvalScore, JudgeResponse
from ldai.observe import LDAIBaggageSpanProcessor, LDAIObserveConfig
from ldai.tracker import AIGraphTracker

__all__ = [
Expand All @@ -23,6 +24,8 @@
'AIAgents',
'AIAgentGraphConfig',
'AIGraphTracker',
'LDAIBaggageSpanProcessor',
'LDAIObserveConfig',
'Edge',
'AICompletionConfig',
'AICompletionConfigDefault',
Expand Down
61 changes: 43 additions & 18 deletions packages/sdk/server-ai/src/ldai/chat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ldai import log
from ldai.judge import Judge
from ldai.models import AICompletionConfig, LDMessage
from ldai.observe import _span_scope, annotate_span_with_ai_config_metadata
from ldai.providers.ai_provider import AIProvider
from ldai.providers.types import ChatResponse, JudgeResponse
from ldai.tracker import LDAIConfigTracker
Expand Down Expand Up @@ -50,29 +51,44 @@ async def invoke(self, prompt: str) -> ChatResponse:
:param prompt: The user prompt to send to the chat model
:return: ChatResponse containing the model's response and metrics
"""
# Convert prompt string to LDMessage with role 'user' and add to conversation history
user_message: LDMessage = LDMessage(role='user', content=prompt)
self._messages.append(user_message)

# Prepend config messages to conversation history for model invocation
config_messages = self._ai_config.messages or []
all_messages = config_messages + self._messages

# Delegate to provider-specific implementation with tracking
response = await self._tracker.track_metrics_of(
lambda: self._provider.invoke_model(all_messages),
lambda result: result.metrics,
)

# Start judge evaluations as async tasks (don't await them)
if (
self._ai_config.judge_configuration
and self._ai_config.judge_configuration.judges
and len(self._ai_config.judge_configuration.judges) > 0
):
response.evaluations = self._start_judge_evaluations(self._messages, response)

# Add the response message to conversation history
observe_config = self._tracker._observe_config
create_if_none = observe_config.annotate_spans and observe_config.create_span_if_none

# Open (or reuse) a span for the full invoke — LLM call AND judge task
# creation must happen inside this block so that asyncio.create_task()
# captures the active span in its context copy. Judge spans created
# later in those tasks will then be correctly parented to this span.
with _span_scope("ld.ai.completion", create_if_none=create_if_none):
if observe_config.annotate_spans:
annotate_span_with_ai_config_metadata(
self._ai_config.key,
self._tracker._variation_key,
self._tracker._model_name,
self._tracker._provider_name,
version=self._tracker._version,
context_key=self._tracker._context.key,
enabled=self._tracker._enabled,
)

response = await self._tracker.track_metrics_of(
lambda: self._provider.invoke_model(all_messages),
lambda result: result.metrics,
)

# Create judge tasks INSIDE the span scope so asyncio.create_task()
# snapshots the context while the completion span is still active.
if (
self._ai_config.judge_configuration
and self._ai_config.judge_configuration.judges
):
response.evaluations = self._start_judge_evaluations(self._messages, response)

self._messages.append(response.message)
return response

Expand Down Expand Up @@ -113,9 +129,18 @@ async def evaluate_judge(judge_config):

return eval_result

observe_config = self._tracker._observe_config
create_judge_span = observe_config.annotate_spans and observe_config.create_span_if_none

async def evaluate_judge_with_span(judge_config):
# Open the ld.ai.judge span BEFORE the judge LLM call so the
# judge's openai.chat span is nested inside it, not beside it.
with _span_scope("ld.ai.judge", create_if_none=create_judge_span):
return await evaluate_judge(judge_config)

# Create tasks for each judge evaluation
tasks = [
asyncio.create_task(evaluate_judge(judge_config))
asyncio.create_task(evaluate_judge_with_span(judge_config))
for judge_config in judge_configs
]

Expand Down
72 changes: 66 additions & 6 deletions packages/sdk/server-ai/src/ldai/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Any, Dict, List, Optional, Tuple
from contextlib import contextmanager
from typing import Any, Dict, Generator, List, Optional, Tuple

import chevron
from ldclient import Context
from ldclient.client import LDClient

from ldai import log
from ldai.observe import LDAIObserveConfig, detach_ai_config_baggage, set_ai_config_baggage
from ldai.agent_graph import AgentGraphDefinition
from ldai.chat import Chat
from ldai.judge import Judge
Expand Down Expand Up @@ -32,8 +34,9 @@
class LDAIClient:
"""The LaunchDarkly AI SDK client object."""

def __init__(self, client: LDClient):
def __init__(self, client: LDClient, observe: Optional[LDAIObserveConfig] = None):
self._client = client
self._observe_config = observe if observe is not None else LDAIObserveConfig()
self._client.track(
_TRACK_SDK_INFO,
_INIT_TRACK_CONTEXT,
Expand Down Expand Up @@ -91,6 +94,60 @@ def completion_config(
key, context, default or AICompletionConfigDefault.disabled(), variables
)

@contextmanager
def config_scope(
self,
key: str,
context: Context,
default: Optional[AICompletionConfigDefault] = None,
variables: Optional[Dict[str, Any]] = None,
) -> Generator[AICompletionConfig, None, None]:
"""
Context manager that evaluates an AI Config and scopes its metadata to
the OTel context for the duration of the block.

While inside the block, any OTel span that is started (including spans
created automatically by OpenLLMetry or other auto-instrumentation) will
have the AI Config key, variation key, model, and provider stamped on it
as span attributes by LDAIBaggageSpanProcessor, if that processor is
registered.

This solves the context propagation problem: when completion_config() is
called at one point in the code and the LLM call happens later, deep in
the call stack, the baggage propagates automatically so the two can be
correlated in LaunchDarkly.

Example::

with aiclient.config_scope("my-ai-config", context) as config:
if config.enabled:
# LLM call can be anywhere inside this block, even in a
# helper function several layers down. OpenLLMetry's
# auto-instrumented span will carry ld.ai_config.key.
response = openai_client.chat.completions.create(
model=config.model.name,
messages=build_messages(config.messages, history),
)
config.tracker.track_openai_metrics(lambda: response)

:param key: The key of the completion configuration.
:param context: The context to evaluate the completion configuration in.
:param default: The default value of the completion configuration.
:param variables: Additional variables for the completion configuration.
:return: Generator yielding the evaluated AICompletionConfig.
"""
config = self.completion_config(key, context, default, variables)

model_name = config.model.name if config.model else ""
provider_name = config.provider.name if config.provider else ""
variation_key = config.tracker._variation_key if config.tracker else ""

_, token = set_ai_config_baggage(key, variation_key, model_name, provider_name)
try:
yield config
finally:
detach_ai_config_baggage(token)

def config(
self,
key: str,
Expand Down Expand Up @@ -661,18 +718,21 @@ def __evaluate(
custom=custom
)

ld_meta = variation.get('_ldMeta', {})
enabled = ld_meta.get('enabled', False)

tracker = LDAIConfigTracker(
self._client,
variation.get('_ldMeta', {}).get('variationKey', ''),
ld_meta.get('variationKey', ''),
key,
int(variation.get('_ldMeta', {}).get('version', 1)),
int(ld_meta.get('version', 1)),
model.name if model else '',
provider_config.name if provider_config else '',
context,
observe_config=self._observe_config,
enabled=bool(enabled),
)

enabled = variation.get('_ldMeta', {}).get('enabled', False)

judge_configuration = None
if 'judgeConfiguration' in variation and isinstance(variation['judgeConfiguration'], dict):
judge_config = variation['judgeConfiguration']
Expand Down
Loading
Loading