diff --git a/langfuse/_client/propagation.py b/langfuse/_client/propagation.py index e0c68db00..2de9cefb9 100644 --- a/langfuse/_client/propagation.py +++ b/langfuse/_client/propagation.py @@ -7,7 +7,9 @@ from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast -from opentelemetry import baggage +from opentelemetry import ( + baggage, +) from opentelemetry import ( baggage as otel_baggage_api, ) @@ -17,6 +19,7 @@ from opentelemetry import ( trace as otel_trace_api, ) +from opentelemetry.context import _RUNTIME_CONTEXT from opentelemetry.util._decorator import ( _AgnosticContextManager, _agnosticcontextmanager, @@ -72,6 +75,22 @@ class PropagatedExperimentAttributes(TypedDict): experiment_item_root_observation_id: str +def _detach_context_token_safely(token: Any) -> None: + """Detach a context token without emitting noisy async teardown errors. + + OpenTelemetry tokens are backed by ``contextvars`` and must be detached in the + same execution context where they were attached. Async frameworks can legitimately + end spans or unwind context managers in a different task/context, in which case + detach raises and the public OpenTelemetry helper logs an error. At that point the + observation is already completed, so the mismatch is safe to ignore. + """ + + try: + _RUNTIME_CONTEXT.detach(token) + except Exception: + pass + + def propagate_attributes( *, user_id: Optional[str] = None, @@ -272,7 +291,7 @@ def _propagate_attributes( yield finally: - otel_context_api.detach(token) + _detach_context_token_safely(token) def _get_propagated_attributes_from_context( diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 373461c51..5b5dfe691 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -15,13 +15,13 @@ import pydantic from opentelemetry import context, trace -from opentelemetry.context import _RUNTIME_CONTEXT from opentelemetry.util._decorator import _AgnosticContextManager from langfuse import propagate_attributes from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.client import Langfuse from langfuse._client.get_client import get_client +from langfuse._client.propagation import _detach_context_token_safely from langfuse._client.span import ( LangfuseAgent, LangfuseChain, @@ -458,18 +458,7 @@ def _detach_observation( token = self._context_tokens.pop(run_id, None) if token: - try: - # Directly detach from runtime context to avoid error logging - _RUNTIME_CONTEXT.detach(token) - except Exception: - # Context detach can fail in async scenarios - this is expected and safe to ignore - # The span itself was properly ended and tracing data is correctly captured - # - # Examples: - # 1. Token created in one async task/thread, detached in another - # 2. Context already detached by framework or other handlers - # 3. Runtime context state mismatch in concurrent execution - pass + _detach_context_token_safely(token) return cast( Union[ @@ -564,11 +553,8 @@ def on_chain_end( input=kwargs.get("inputs"), ) - if ( - parent_run_id is None - and self._propagation_context_manager is not None - ): - self._propagation_context_manager.__exit__(None, None, None) + if parent_run_id is None: + self._exit_propagation_context() span.end() @@ -579,6 +565,7 @@ def on_chain_end( finally: if parent_run_id is None: + self._exit_propagation_context() self._reset() def on_chain_error( @@ -608,10 +595,19 @@ def on_chain_error( status_message=str(error) if level else None, input=kwargs.get("inputs"), cost_details={"total": 0}, - ).end() + ) + + if parent_run_id is None: + self._exit_propagation_context() + + observation.end() except Exception as e: langfuse_logger.exception(e) + finally: + if parent_run_id is None: + self._exit_propagation_context() + self._reset() def on_chat_model_start( self, @@ -1026,6 +1022,15 @@ def on_llm_error( def _reset(self) -> None: self._child_to_parent_run_id_map = {} + def _exit_propagation_context(self) -> None: + manager = self._propagation_context_manager + + if manager is None: + return + + self._propagation_context_manager = None + manager.__exit__(None, None, None) + def __join_tags_and_metadata( self, tags: Optional[List[str]] = None,