Skip to content

fix(async-streaming): harden context preservation#1576

Open
hassiebp wants to merge 2 commits intomainfrom
harden-async-streaming
Open

fix(async-streaming): harden context preservation#1576
hassiebp wants to merge 2 commits intomainfrom
harden-async-streaming

Conversation

@hassiebp
Copy link
Contributor

@hassiebp hassiebp commented Mar 25, 2026

Disclaimer: Experimental PR review

Greptile Summary

This PR hardens context preservation for async and sync streaming by ensuring generator wrappers properly finalize their Langfuse spans on early consumer break, exception, explicit close()/aclose(), and throw operations. It also fixes the previously broken context-manager protocol on LangfuseResponseGeneratorSync and LangfuseResponseGeneratorAsync (__enter__ was returning an iterator instead of self; __exit__ was a no-op).

Key changes:

  • _ContextPreservedSyncGeneratorWrapper.__iter__ is converted to a generator with a finally: close() guard, correctly draining and closing the inner generator on early break.
  • Both wrapper classes gain _finalized idempotency flags, extracted _finalize() methods, and explicit close()/throw() (sync) and aclose()/athrow() (async) methods.
  • _run_in_preserved_context() is refactored to avoid the repeated try/except TypeError for Python < 3.10.
  • LangfuseResponseGeneratorSync/Async context-manager entry now correctly returns self and exit now calls close()/aclose().
  • New unit tests cover early-break finalization, idempotent close, and the Python < 3.10 create_task fallback.

One gap remains: in LangfuseResponseGeneratorSync.__iter__ (line 1024–1025) and LangfuseResponseGeneratorAsync.__aiter__ (line 1115–1116), the finally block calls self._finalize() directly rather than self.close(). This sets _finalized = True before the underlying response.close()/aclose() is called. If the stream is abandoned mid-iteration through the for loop (as opposed to through __next__) and then close() is invoked (e.g. via __exit__), close() returns early and the underlying HTTP connection is never explicitly closed.

Confidence Score: 4/5

  • Safe to merge after addressing the __iter__/__aiter__ finally calling _finalize() instead of close(), which can leave HTTP connections unclosed on early break.
  • The core fixes are solid and well-tested. The _ContextPreservedSyncGeneratorWrapper early-break path is handled correctly. The one concrete issue — LangfuseResponseGeneratorSync.__iter__ and LangfuseResponseGeneratorAsync.__aiter__ bypassing response.close() — is a resource-management gap introduced in this PR, but it requires a specific usage pattern (context manager + for-loop + break) and Python's GC provides a safety net. The fix is a one-line change in each class.
  • langfuse/openai.py__iter__ and __aiter__ finally blocks (lines 1024–1025 and 1115–1116)

Important Files Changed

Filename Overview
langfuse/_client/observe.py Core fix file – adds _finalized idempotency, proper close()/throw() delegation, and early-break finally guard in __iter__ for the sync wrapper. The async wrapper gains aclose(), athrow(), and a cleaner _run_in_preserved_context(). Logic is sound and the _ContextPreservedSyncGeneratorWrapper early-break path is correctly handled.
langfuse/openai.py Adds _finalized guard, fixes context-manager entry/exit, and introduces close()/aclose() for both generator classes. The __iter__/__aiter__ finally blocks still call _finalize() directly instead of close(), leaving the underlying response unclosed when the stream is abandoned mid-iteration through the for-loop path.
tests/test_decorators.py Adds unit tests for early-break finalization (sync + async), aclosing() pattern, and Python < 3.10 create_task fallback. Good coverage of the new wrapper methods using a lightweight _FakeObservation stub.
tests/test_openai.py Adds unit tests for idempotent close()/aclose() on partial streams. Tests only exercise the next()/__anext__() path, not the for chunk in stream: break path through __iter__/__aiter__, so the resource-leak edge case goes untested.

Sequence Diagram

sequenceDiagram
    participant C as Consumer
    participant W as Generator Wrapper
    participant G as Inner Generator / Response
    participant S as Langfuse Span

    Note over C,S: Happy path – full consumption
    C->>W: for item in wrapper / next()
    W->>G: context.run(next, generator)
    G-->>W: item
    W-->>C: item
    G-->>W: StopIteration / StopAsyncIteration
    W->>W: _finalize()
    W->>S: span.update(output=...).end()

    Note over C,S: Early break – sync (_ContextPreservedSyncGeneratorWrapper)
    C->>W: for item in wrapper → break
    W->>W: __iter__ finally: if not _finalized → close()
    W->>G: context.run(generator.close)
    W->>W: _finalize()
    W->>S: span.update(output=partial).end()

    Note over C,S: Early break – LangfuseResponseGeneratorSync (gap)
    C->>W: for chunk in stream → break
    W->>W: __iter__ finally: _finalize() [sets _finalized=True]
    W->>S: generation.end()
    C->>W: close() via __exit__
    W->>W: _finalized==True → return early
    W--xG: response.close() NEVER called
Loading

Comments Outside Diff (1)

  1. langfuse/openai.py, line 1024-1025 (link)

    P1 close() skipped when iterated via __iter__ with early break

    When consumers use the for chunk in stream: path (which goes through __iter__), the finally block calls self._finalize(), setting _finalized = True. If iteration is abandoned early (e.g. break), the __exit__ method then calls close(), but close() sees _finalized == True and returns immediately — so self.response.close() is never called, leaving the underlying HTTP connection open.

    The new tests only exercise the next(stream) path (__next__), which does not trigger this issue. A representative scenario:

    with LangfuseResponseGeneratorSync(...) as stream:
        for chunk in stream:      # uses __iter__
            process(chunk)
            break                 # exits early
    # __exit__ → close() → _finalized=True → early return, response never closed

    The same problem exists in LangfuseResponseGeneratorAsync.__aiter__ (line 1115–1116), where finally: await self._finalize() sets the flag before aclose() can close the underlying response.

    Changing __iter__'s finally to call self.close() instead of self._finalize() would fix both paths, since close() is already idempotent via the _finalized guard, and _finalize() is called inside close().

Reviews (1): Last reviewed commit: "push;" | Re-trigger Greptile

@github-actions
Copy link

@claude review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: edf45e3a7e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +1189 to +1190
if self._finalized:
return

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close async response even when stream already finalized

LangfuseResponseGeneratorAsync.__aiter__ always runs _finalize() in its finally, so after any async for exit (including an early break) _finalized is already True; with this new guard, a later explicit await stream.aclose() becomes a no-op and never reaches self.response.aclose()/close(). That regresses resource cleanup for partial-consumption flows and can leave the underlying HTTP stream open, especially in patterns that break early and then call aclose() to release the connection.

Useful? React with 👍 / 👎.

Comment on lines 683 to +684
self.transform_fn = transform_fn
self._finalized = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The _ContextPreservedAsyncGeneratorWrapper.__aiter__ method returns self, making it a plain async iterator rather than an async generator, so Python will not auto-call aclose() when a caller breaks out of async for item in wrapper: early, leaving the span permanently unfinalized. The sync wrapper was correctly hardened in this PR by converting __iter__ into a generator function with a finally: self.close() block, but the async counterpart was not given the same treatment; callers must explicitly use aclosing(wrapper) to avoid the leak, as the new test itself demonstrates.

Extended reasoning...

What the bug is and how it manifests

_ContextPreservedAsyncGeneratorWrapper implements __aiter__ by returning self, which makes instances plain async iterators, not async generators. Python's async for loop only automatically calls aclose() on objects whose runtime type is types.AsyncGeneratorType (produced by calling an async def function that contains yield). For plain async iterators that merely implement __aiter__/__anext__, Python places cleanup responsibility entirely on the caller.

The specific code path that triggers it

When user code does async for item in async_gen_wrapper: process(item); break, Python calls __aiter__() once (returns self), then calls __anext__() in a loop. On break, the loop simply stops -- it does NOT call aclose() because the wrapper is not a real async generator. None of _finalize()'s three call-sites are reached: exhaustion (StopAsyncIteration), exception, or explicit aclose().

Why existing code doesn't prevent it

The _finalize() method and _finalized guard are both correct and idempotent. The problem is structural: _finalize() is never invoked in the early-break scenario because the wrapper is a plain async iterator, not an async generator, so the Python runtime provides no automatic cleanup hook.

What the impact would be

Every early break from async for item in wrapper: that does not explicitly call await wrapper.aclose() leaves the associated Langfuse span permanently open. span.end() is never called, no output is recorded. This is a silent observability and resource leak affecting any caller of an @observe-decorated async generator function.

How to fix it

Apply the same pattern used for the sync wrapper: convert __aiter__ into an async generator method with a finally block:

async def __aiter__(self):
    try:
        while True:
            yield await self.__anext__()
    except StopAsyncIteration:
        return
    finally:
        if not self._finalized:
            await self.aclose()

This returns an actual async generator object, so Python automatically calls aclose() on early break.

Step-by-step proof

  1. @observe wraps an async generator; _wrap_async_generator_result returns a _ContextPreservedAsyncGeneratorWrapper instance w.
  2. Caller does async for item in w: break.
  3. Python calls w.__aiter__() -- returns w itself (type is _ContextPreservedAsyncGeneratorWrapper, not AsyncGeneratorType).
  4. Python calls await w.__anext__() -- yields the first item.
  5. break exits the loop. Python does NOT call await w.aclose() -- w is a plain async iterator.
  6. w._finalize() is never called. w.span.end() is never called. Span remains open indefinitely.
  7. The new test test_async_generator_wrapper_aclose_finalizes_partial_output explicitly uses async with aclosing(wrapper) as stream: as a workaround -- direct evidence that without aclosing, the span is not finalized.

Comment on lines 600 to +614

if self.transform_fn is not None:
output = self.transform_fn(self.items)
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
def close(self) -> None:
if self._finalized:
return

self.span.update(output=output).end()
try:
close_method = getattr(self.generator, "close", None)
if callable(close_method):
self.context.run(close_method)
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In _ContextPreservedSyncGeneratorWrapper.close(), _finalize() is only called if close_method() either succeeds or raises Exception/asyncio.CancelledError; if it raises a BaseException not matched by that handler (e.g. KeyboardInterrupt, SystemExit), the trailing self._finalize() is never reached and the span leaks open forever. The same PR introduces LangfuseResponseGeneratorSync.close() which correctly uses try/finally to guarantee finalization — _ContextPreservedSyncGeneratorWrapper.close() should be updated to match.

Extended reasoning...

What the bug is

In _ContextPreservedSyncGeneratorWrapper.close() (lines 600-614 of langfuse/_client/observe.py), the call to self.context.run(close_method) is inside a try/except (Exception, asyncio.CancelledError) block. The span-finalizing call self._finalize() sits after that block, outside any finally clause:

def close(self) -> None:
    if self._finalized:
        return
    try:
        close_method = getattr(self.generator, 'close', None)
        if callable(close_method):
            self.context.run(close_method)
    except (Exception, asyncio.CancelledError) as e:
        self._finalize(error=e)
        raise
    self._finalize()  # only reached if no uncaught BaseException propagates

The specific code path that triggers it

If close_method() raises a BaseException subclass not covered by the except clause — for example KeyboardInterrupt or SystemExit — the handler is bypassed, and execution unwinds past the try/except block without ever reaching the trailing self._finalize(). The span is left open with _finalized = False.

Why existing code does not prevent it

The __iter__ method does have a finally block that calls self.close() on early loop exit, which handles the for-loop break case. However, once control is inside close() itself, if close_method() raises a BaseException, the __iter__ finally has already fired (or is not on the call stack). There is no further safety net. _finalized remains False and the span is genuinely leaked.

Comparison with the correct pattern introduced in this same PR

LangfuseResponseGeneratorSync.close() (also new in this PR) uses try/finally correctly:

def close(self) -> None:
    if self._finalized:
        return
    close_method = getattr(self.response, 'close', None)
    if callable(close_method):
        try:
            close_method()
        finally:
            self._finalize()  # always runs regardless of exception type
        return
    self._finalize()

_ContextPreservedAsyncGeneratorWrapper.aclose() also uses try/finally. The asymmetry is limited to _ContextPreservedSyncGeneratorWrapper.close().

Impact

If KeyboardInterrupt or SystemExit fires during generator teardown, the Langfuse span is never ended. This is a rare edge case (signal or interpreter shutdown during active generator close), but it is a real span leak that contradicts the always-finalize invariant the PR is carefully establishing everywhere else.

How to fix

Use try/finally while preserving error annotation on caught exceptions:

def close(self) -> None:
    if self._finalized:
        return
    error = None
    try:
        close_method = getattr(self.generator, 'close', None)
        if callable(close_method):
            self.context.run(close_method)
    except (Exception, asyncio.CancelledError) as e:
        error = e
        raise
    finally:
        self._finalize(error=error)

Step-by-step proof

  1. wrapper = _ContextPreservedSyncGeneratorWrapper(g, ctx, span, None) — span is open, _finalized=False.
  2. Consumer iterates for item in wrapper then breaks — __iter__'s finally clause calls wrapper.close().
  3. Inside close(): _finalized is False, we enter the try block and call self.context.run(close_method).
  4. The underlying generator's close() executes and a KeyboardInterrupt is raised (e.g. via OS signal handler active at that moment).
  5. KeyboardInterrupt is not an Exception or asyncio.CancelledError; the except clause does not match it.
  6. The exception propagates out of the try/except entirely. The line self._finalize() after the block is never executed.
  7. _finalized stays False; span.end() is never called; the span is leaked indefinitely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant