diff --git a/langfuse/_client/utils.py b/langfuse/_client/utils.py index 16d963d88..ccce3d9ef 100644 --- a/langfuse/_client/utils.py +++ b/langfuse/_client/utils.py @@ -5,6 +5,7 @@ """ import asyncio +import contextvars import json import threading from hashlib import sha256 @@ -69,13 +70,14 @@ class _RunAsyncThread(threading.Thread): def __init__(self, coro: Coroutine[Any, Any, Any]) -> None: self.coro = coro + self.context = contextvars.copy_context() self.result: Any = None self.exception: Exception | None = None super().__init__() def run(self) -> None: try: - self.result = asyncio.run(self.coro) + self.result = self.context.run(asyncio.run, self.coro) except Exception as e: self.exception = e diff --git a/tests/test_propagate_attributes.py b/tests/test_propagate_attributes.py index 566ba6392..b3be9f830 100644 --- a/tests/test_propagate_attributes.py +++ b/tests/test_propagate_attributes.py @@ -2295,6 +2295,36 @@ def test_tags_attribute_key_format(self, langfuse_client, memory_exporter): class TestPropagateAttributesExperiment(TestPropagateAttributesBase): """Tests for experiment attribute propagation.""" + @pytest.mark.asyncio + async def test_experiment_propagates_user_id_in_async_context( + self, langfuse_client, memory_exporter + ): + """Verify run_experiment keeps propagated attributes when called from async code.""" + import asyncio + + local_data = [{"input": "test input", "expected_output": "expected output"}] + + async def async_task(*, item, **kwargs): + await asyncio.sleep(0.01) + return f"processed: {item['input']}" + + with propagate_attributes(user_id="async-experiment-user"): + langfuse_client.run_experiment( + name="Async Experiment", + data=local_data, + task=async_task, + ) + + langfuse_client.flush() + time.sleep(0.1) + + root_span = self.get_span_by_name(memory_exporter, "experiment-item-run") + self.verify_span_attribute( + root_span, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "async-experiment-user", + ) + def test_experiment_attributes_propagate_without_dataset( self, langfuse_client, memory_exporter ): diff --git a/tests/test_utils.py b/tests/test_utils.py index ac3ee8473..968bcb91b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,7 @@ """Test suite for utility functions in langfuse._client.utils module.""" import asyncio +import contextvars import threading from unittest import mock @@ -81,6 +82,22 @@ async def check_thread_isolation(): result = run_async_safely(check_thread_isolation()) assert result == "isolated" + @pytest.mark.asyncio + async def test_run_async_context_preserves_contextvars(self): + """Test that threaded execution preserves the caller's contextvars.""" + request_id = contextvars.ContextVar("request_id") + token = request_id.set("req-123") + + async def read_contextvar(): + await asyncio.sleep(0.001) + return request_id.get() + + try: + result = run_async_safely(read_contextvar()) + assert result == "req-123" + finally: + request_id.reset(token) + def test_multiple_calls_sync_context(self): """Test multiple sequential calls in sync context."""