Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion langfuse/_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import asyncio
import contextvars
import json
import threading
from hashlib import sha256
Expand Down Expand Up @@ -69,13 +70,14 @@ class _RunAsyncThread(threading.Thread):

def __init__(self, coro: Coroutine[Any, Any, Any]) -> None:
self.coro = coro
self.context = contextvars.copy_context()
self.result: Any = None
self.exception: Exception | None = None
super().__init__()

def run(self) -> None:
try:
self.result = asyncio.run(self.coro)
self.result = self.context.run(asyncio.run, self.coro)
except Exception as e:
self.exception = e

Expand Down
30 changes: 30 additions & 0 deletions tests/test_propagate_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2295,6 +2295,36 @@ def test_tags_attribute_key_format(self, langfuse_client, memory_exporter):
class TestPropagateAttributesExperiment(TestPropagateAttributesBase):
"""Tests for experiment attribute propagation."""

@pytest.mark.asyncio
async def test_experiment_propagates_user_id_in_async_context(
self, langfuse_client, memory_exporter
):
"""Verify run_experiment keeps propagated attributes when called from async code."""
import asyncio

local_data = [{"input": "test input", "expected_output": "expected output"}]

async def async_task(*, item, **kwargs):
await asyncio.sleep(0.01)
return f"processed: {item['input']}"

with propagate_attributes(user_id="async-experiment-user"):
langfuse_client.run_experiment(
name="Async Experiment",
data=local_data,
task=async_task,
)

langfuse_client.flush()
time.sleep(0.1)

root_span = self.get_span_by_name(memory_exporter, "experiment-item-run")
self.verify_span_attribute(
root_span,
LangfuseOtelSpanAttributes.TRACE_USER_ID,
"async-experiment-user",
)

def test_experiment_attributes_propagate_without_dataset(
self, langfuse_client, memory_exporter
):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test suite for utility functions in langfuse._client.utils module."""

import asyncio
import contextvars
import threading
from unittest import mock

Expand Down Expand Up @@ -81,6 +82,22 @@ async def check_thread_isolation():
result = run_async_safely(check_thread_isolation())
assert result == "isolated"

@pytest.mark.asyncio
async def test_run_async_context_preserves_contextvars(self):
"""Test that threaded execution preserves the caller's contextvars."""
request_id = contextvars.ContextVar("request_id")
token = request_id.set("req-123")

async def read_contextvar():
await asyncio.sleep(0.001)
return request_id.get()

try:
result = run_async_safely(read_contextvar())
assert result == "req-123"
finally:
request_id.reset(token)

def test_multiple_calls_sync_context(self):
"""Test multiple sequential calls in sync context."""

Expand Down
Loading