Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions langfuse/_client/propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast

from opentelemetry import baggage
from opentelemetry import (
baggage,
)
from opentelemetry import (
baggage as otel_baggage_api,
)
Expand All @@ -17,6 +19,7 @@
from opentelemetry import (
trace as otel_trace_api,
)
from opentelemetry.context import _RUNTIME_CONTEXT
from opentelemetry.util._decorator import (
_AgnosticContextManager,
_agnosticcontextmanager,
Expand Down Expand Up @@ -72,6 +75,22 @@ class PropagatedExperimentAttributes(TypedDict):
experiment_item_root_observation_id: str


def _detach_context_token_safely(token: Any) -> None:
"""Detach a context token without emitting noisy async teardown errors.

OpenTelemetry tokens are backed by ``contextvars`` and must be detached in the
same execution context where they were attached. Async frameworks can legitimately
end spans or unwind context managers in a different task/context, in which case
detach raises and the public OpenTelemetry helper logs an error. At that point the
observation is already completed, so the mismatch is safe to ignore.
"""

try:
_RUNTIME_CONTEXT.detach(token)
except Exception:
pass


def propagate_attributes(
*,
user_id: Optional[str] = None,
Expand Down Expand Up @@ -272,7 +291,7 @@ def _propagate_attributes(
yield

finally:
otel_context_api.detach(token)
_detach_context_token_safely(token)


def _get_propagated_attributes_from_context(
Expand Down
43 changes: 24 additions & 19 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

import pydantic
from opentelemetry import context, trace
from opentelemetry.context import _RUNTIME_CONTEXT
from opentelemetry.util._decorator import _AgnosticContextManager

from langfuse import propagate_attributes
from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.client import Langfuse
from langfuse._client.get_client import get_client
from langfuse._client.propagation import _detach_context_token_safely
from langfuse._client.span import (
LangfuseAgent,
LangfuseChain,
Expand Down Expand Up @@ -458,18 +458,7 @@ def _detach_observation(
token = self._context_tokens.pop(run_id, None)

if token:
try:
# Directly detach from runtime context to avoid error logging
_RUNTIME_CONTEXT.detach(token)
except Exception:
# Context detach can fail in async scenarios - this is expected and safe to ignore
# The span itself was properly ended and tracing data is correctly captured
#
# Examples:
# 1. Token created in one async task/thread, detached in another
# 2. Context already detached by framework or other handlers
# 3. Runtime context state mismatch in concurrent execution
pass
_detach_context_token_safely(token)

return cast(
Union[
Expand Down Expand Up @@ -564,11 +553,8 @@ def on_chain_end(
input=kwargs.get("inputs"),
)

if (
parent_run_id is None
and self._propagation_context_manager is not None
):
self._propagation_context_manager.__exit__(None, None, None)
if parent_run_id is None:
self._exit_propagation_context()

span.end()

Expand All @@ -579,6 +565,7 @@ def on_chain_end(

finally:
if parent_run_id is None:
self._exit_propagation_context()
self._reset()

def on_chain_error(
Expand Down Expand Up @@ -608,10 +595,19 @@ def on_chain_error(
status_message=str(error) if level else None,
input=kwargs.get("inputs"),
cost_details={"total": 0},
).end()
)

if parent_run_id is None:
self._exit_propagation_context()

observation.end()

except Exception as e:
langfuse_logger.exception(e)
finally:
if parent_run_id is None:
self._exit_propagation_context()
self._reset()

def on_chat_model_start(
self,
Expand Down Expand Up @@ -1026,6 +1022,15 @@ def on_llm_error(
def _reset(self) -> None:
self._child_to_parent_run_id_map = {}

def _exit_propagation_context(self) -> None:
manager = self._propagation_context_manager

if manager is None:
return

self._propagation_context_manager = None
manager.__exit__(None, None, None)

def __join_tags_and_metadata(
self,
tags: Optional[List[str]] = None,
Expand Down
Loading