From eae79aa3beb841e8bc981791118d2e63a5d47372 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 30 Mar 2026 14:53:47 +0200 Subject: [PATCH 1/2] fix(langchain): exit propagation context gracefully --- langfuse/_client/propagation.py | 23 ++++++++- langfuse/langchain/CallbackHandler.py | 43 +++++++++-------- tests/test_langchain_callback_handler.py | 59 ++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 21 deletions(-) create mode 100644 tests/test_langchain_callback_handler.py diff --git a/langfuse/_client/propagation.py b/langfuse/_client/propagation.py index e0c68db00..2de9cefb9 100644 --- a/langfuse/_client/propagation.py +++ b/langfuse/_client/propagation.py @@ -7,7 +7,9 @@ from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast -from opentelemetry import baggage +from opentelemetry import ( + baggage, +) from opentelemetry import ( baggage as otel_baggage_api, ) @@ -17,6 +19,7 @@ from opentelemetry import ( trace as otel_trace_api, ) +from opentelemetry.context import _RUNTIME_CONTEXT from opentelemetry.util._decorator import ( _AgnosticContextManager, _agnosticcontextmanager, @@ -72,6 +75,22 @@ class PropagatedExperimentAttributes(TypedDict): experiment_item_root_observation_id: str +def _detach_context_token_safely(token: Any) -> None: + """Detach a context token without emitting noisy async teardown errors. + + OpenTelemetry tokens are backed by ``contextvars`` and must be detached in the + same execution context where they were attached. Async frameworks can legitimately + end spans or unwind context managers in a different task/context, in which case + detach raises and the public OpenTelemetry helper logs an error. At that point the + observation is already completed, so the mismatch is safe to ignore. + """ + + try: + _RUNTIME_CONTEXT.detach(token) + except Exception: + pass + + def propagate_attributes( *, user_id: Optional[str] = None, @@ -272,7 +291,7 @@ def _propagate_attributes( yield finally: - otel_context_api.detach(token) + _detach_context_token_safely(token) def _get_propagated_attributes_from_context( diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 373461c51..5b5dfe691 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -15,13 +15,13 @@ import pydantic from opentelemetry import context, trace -from opentelemetry.context import _RUNTIME_CONTEXT from opentelemetry.util._decorator import _AgnosticContextManager from langfuse import propagate_attributes from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.client import Langfuse from langfuse._client.get_client import get_client +from langfuse._client.propagation import _detach_context_token_safely from langfuse._client.span import ( LangfuseAgent, LangfuseChain, @@ -458,18 +458,7 @@ def _detach_observation( token = self._context_tokens.pop(run_id, None) if token: - try: - # Directly detach from runtime context to avoid error logging - _RUNTIME_CONTEXT.detach(token) - except Exception: - # Context detach can fail in async scenarios - this is expected and safe to ignore - # The span itself was properly ended and tracing data is correctly captured - # - # Examples: - # 1. Token created in one async task/thread, detached in another - # 2. Context already detached by framework or other handlers - # 3. Runtime context state mismatch in concurrent execution - pass + _detach_context_token_safely(token) return cast( Union[ @@ -564,11 +553,8 @@ def on_chain_end( input=kwargs.get("inputs"), ) - if ( - parent_run_id is None - and self._propagation_context_manager is not None - ): - self._propagation_context_manager.__exit__(None, None, None) + if parent_run_id is None: + self._exit_propagation_context() span.end() @@ -579,6 +565,7 @@ def on_chain_end( finally: if parent_run_id is None: + self._exit_propagation_context() self._reset() def on_chain_error( @@ -608,10 +595,19 @@ def on_chain_error( status_message=str(error) if level else None, input=kwargs.get("inputs"), cost_details={"total": 0}, - ).end() + ) + + if parent_run_id is None: + self._exit_propagation_context() + + observation.end() except Exception as e: langfuse_logger.exception(e) + finally: + if parent_run_id is None: + self._exit_propagation_context() + self._reset() def on_chat_model_start( self, @@ -1026,6 +1022,15 @@ def on_llm_error( def _reset(self) -> None: self._child_to_parent_run_id_map = {} + def _exit_propagation_context(self) -> None: + manager = self._propagation_context_manager + + if manager is None: + return + + self._propagation_context_manager = None + manager.__exit__(None, None, None) + def __join_tags_and_metadata( self, tags: Optional[List[str]] = None, diff --git a/tests/test_langchain_callback_handler.py b/tests/test_langchain_callback_handler.py new file mode 100644 index 000000000..34fbb1994 --- /dev/null +++ b/tests/test_langchain_callback_handler.py @@ -0,0 +1,59 @@ +import logging +from importlib import import_module +from unittest.mock import MagicMock +from uuid import uuid4 + +from langfuse._client import propagation +from langfuse._client.propagation import propagate_attributes +from langfuse.langchain.CallbackHandler import LangchainCallbackHandler + + +def test_propagate_attributes_swallows_context_mismatch(monkeypatch, caplog): + fake_token = object() + + monkeypatch.setattr( + propagation.otel_context_api, + "attach", + lambda *, context: fake_token, + ) + + def raise_context_mismatch(token): + assert token is fake_token + raise ValueError("token was created in a different Context") + + monkeypatch.setattr(propagation._RUNTIME_CONTEXT, "detach", raise_context_mismatch) + + caplog.set_level(logging.ERROR, logger="opentelemetry.context") + + with propagate_attributes(user_id="test-user"): + pass + + assert not [ + record + for record in caplog.records + if record.name == "opentelemetry.context" + and "Failed to detach context" in record.getMessage() + ] + + +def test_on_chain_error_exits_root_propagation_context(monkeypatch): + mock_client = MagicMock() + callback_handler_module = import_module("langfuse.langchain.CallbackHandler") + monkeypatch.setattr( + callback_handler_module, "get_client", lambda public_key=None: mock_client + ) + + handler = LangchainCallbackHandler() + manager = MagicMock() + observation = MagicMock() + observation.update.return_value = observation + run_id = uuid4() + + handler._propagation_context_manager = manager + monkeypatch.setattr(handler, "_detach_observation", lambda _: observation) + + handler.on_chain_error(GeneratorExit(), run_id=run_id, parent_run_id=None) + + manager.__exit__.assert_called_once_with(None, None, None) + observation.end.assert_called_once() + assert handler._propagation_context_manager is None From 71146c21b2f745ca71c3937bc5b117c27bb6e958 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:02:50 +0200 Subject: [PATCH 2/2] push --- tests/test_langchain_callback_handler.py | 59 ------------------------ 1 file changed, 59 deletions(-) delete mode 100644 tests/test_langchain_callback_handler.py diff --git a/tests/test_langchain_callback_handler.py b/tests/test_langchain_callback_handler.py deleted file mode 100644 index 34fbb1994..000000000 --- a/tests/test_langchain_callback_handler.py +++ /dev/null @@ -1,59 +0,0 @@ -import logging -from importlib import import_module -from unittest.mock import MagicMock -from uuid import uuid4 - -from langfuse._client import propagation -from langfuse._client.propagation import propagate_attributes -from langfuse.langchain.CallbackHandler import LangchainCallbackHandler - - -def test_propagate_attributes_swallows_context_mismatch(monkeypatch, caplog): - fake_token = object() - - monkeypatch.setattr( - propagation.otel_context_api, - "attach", - lambda *, context: fake_token, - ) - - def raise_context_mismatch(token): - assert token is fake_token - raise ValueError("token was created in a different Context") - - monkeypatch.setattr(propagation._RUNTIME_CONTEXT, "detach", raise_context_mismatch) - - caplog.set_level(logging.ERROR, logger="opentelemetry.context") - - with propagate_attributes(user_id="test-user"): - pass - - assert not [ - record - for record in caplog.records - if record.name == "opentelemetry.context" - and "Failed to detach context" in record.getMessage() - ] - - -def test_on_chain_error_exits_root_propagation_context(monkeypatch): - mock_client = MagicMock() - callback_handler_module = import_module("langfuse.langchain.CallbackHandler") - monkeypatch.setattr( - callback_handler_module, "get_client", lambda public_key=None: mock_client - ) - - handler = LangchainCallbackHandler() - manager = MagicMock() - observation = MagicMock() - observation.update.return_value = observation - run_id = uuid4() - - handler._propagation_context_manager = manager - monkeypatch.setattr(handler, "_detach_observation", lambda _: observation) - - handler.on_chain_error(GeneratorExit(), run_id=run_id, parent_run_id=None) - - manager.__exit__.assert_called_once_with(None, None, None) - observation.end.assert_called_once() - assert handler._propagation_context_manager is None