fix(experiments): maintain propagated context in async experiments#1587
fix(experiments): maintain propagated context in async experiments#1587
Conversation
|
@claude review |
There was a problem hiding this comment.
LGTM — the fix is correct and well-tested.
Extended reasoning...
Overview
The PR modifies _RunAsyncThread in langfuse/_client/utils.py to capture the caller's contextvars context via contextvars.copy_context() at construction time and then execute the coroutine within that context using self.context.run(asyncio.run, self.coro). This is a 3-line production change plus tests.
Security risks
None. This is a pure context propagation fix with no auth, crypto, or permission implications.
Level of scrutiny
Low. The change is small, follows the documented Python pattern for context propagation across threads, and is well-covered by the new unit test in test_utils.py and integration test in test_propagate_attributes.py.
Other factors
Two bugs were flagged by the bug hunter: a nit about time.sleep vs await asyncio.sleep in the new async test (the test still passes correctly; no competing coroutines), and a pre-existing dead else-branch in run_async_safely that is unrelated to this PR. Neither warrants blocking the fix.
Disclaimer: Experimental PR review
Greptile Summary
This PR fixes context propagation for async experiments by capturing the caller's
contextvarscontext at the point where_RunAsyncThreadis created and executing the coroutine within that copied context. Previously, whenrun_experimentwas called from an async context (e.g. a FastAPI handler or Jupyter notebook), the worker thread started a brand-new context, silently dropping anyContextVarvalues — such asuser_idset viapropagate_attributes— before the experiment spans were created.Key changes:
langfuse/_client/utils.py:_RunAsyncThread.__init__now callscontextvars.copy_context()to snapshot the caller's context, andrun()usesself.context.run(asyncio.run, self.coro)to execute the coroutine inside that snapshot.tests/test_utils.py: Adds a focused unit test (test_run_async_context_preserves_contextvars) that sets aContextVarin an async caller, invokesrun_async_safely, and asserts the value is readable inside the threaded coroutine.tests/test_propagate_attributes.py: Adds an end-to-end integration test confirming thatuser_idset viapropagate_attributesis present on theexperiment-item-runspan whenrun_experimentis called from async code.Confidence Score: 5/5
Safe to merge — the fix is minimal, correct, and well-tested; the single finding is a P2 style issue (inline import) that does not affect runtime behavior.
The core change in
utils.pyis a well-understood Python idiom (contextvars.copy_context()+context.run()) with no risk of regression. Both a unit test and an integration test cover the new behavior. The only finding is a cosmetic import placement issue in the new test, which does not affect correctness or merge safety.tests/test_propagate_attributes.py — minor:
import asyncioshould be moved to the module top per project convention.Important Files Changed
contextvars.copy_context()at thread creation and executes the coroutine inside that copied context, correctly propagating ContextVar values (e.g. user_id) to async experiments running in a worker thread.test_run_async_context_preserves_contextvarsto directly verify that a ContextVar set before callingrun_async_safelyis readable inside the threaded coroutine; import moved correctly to the module top.user_idpropagation; contains animport asyncioplaced inside the test function body, violating the module-level import convention.Sequence Diagram
sequenceDiagram participant AC as Async Caller participant RAS as run_async_safely() participant RAT as _RunAsyncThread participant CTX as contextvars.Context (copied) participant CORO as Coroutine (experiment task) AC->>AC: propagate_attributes(user_id="x") sets ContextVar AC->>RAS: run_async_safely(coro) RAS->>RAS: asyncio.get_running_loop() → running loop detected RAS->>RAT: __init__(coro) — copy_context() captures current ContextVars RAS->>RAT: start() + join() RAT->>CTX: context.run(asyncio.run, coro) CTX->>CORO: executes with copied context — user_id visible CORO-->>RAT: result RAT-->>RAS: thread.result RAS-->>AC: return resultReviews (1): Last reviewed commit: "fix(experiments): maintain propagated co..." | Re-trigger Greptile
Context used:
Learnt From
langfuse/langfuse-python#1387