From a21406f13fa28ce29fca0506f9f890cffe29d457 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 13:59:05 +0100 Subject: [PATCH 1/7] fix(anthropic): Respect iterator protocol in asynchronous streamed responses --- sentry_sdk/integrations/anthropic.py | 296 +++++++++--- .../integrations/anthropic/test_anthropic.py | 430 ++++++++++++++++++ 2 files changed, 664 insertions(+), 62 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 98d4b50ed9..ab1422b545 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -43,6 +43,7 @@ MessageStreamManager, MessageStream, AsyncMessageStreamManager, + AsyncMessageStream, ) from anthropic.types import ( @@ -60,7 +61,15 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable + from typing import ( + Any, + AsyncIterator, + Iterator, + Optional, + Union, + Callable, + Awaitable, + ) from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -71,7 +80,6 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -94,12 +102,15 @@ def setup_once() -> None: _check_minimum_version(AnthropicIntegration, version) """ - client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. + Analogously, an AsyncStream instance can be returned, which implements the asynchronous iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept - streamed events. The streamed events are used to populate output attributes on the AI Client Span. + streamed events (and analogously, asynchronous iterators are consumed using __aiter__ or __anext__). The + streamed events are used to populate output attributes on the AI Client Span. The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + the finally block in the __iter__/__aiter__ patch closes the span. """ Messages.create = _wrap_message_create(Messages.create) Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) @@ -107,6 +118,9 @@ def setup_once() -> None: Stream.close = _wrap_stream_close(Stream.close) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + AsyncStream.__aiter__ = _wrap_async_stream_aiter(AsyncStream.__aiter__) + AsyncStream.__anext__ = _wrap_async_stream_anext(AsyncStream.__anext__) + AsyncStream.close = _wrap_async_stream_close(AsyncStream.close) """ client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. @@ -121,6 +135,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ ) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) + AsyncMessageStreamManager.__aenter__ = ( + _wrap_async_message_stream_manager_aenter( + AsyncMessageStreamManager.__aenter__ + ) + ) + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. if version is not None and version >= (0, 26, 2): @@ -128,12 +149,15 @@ def setup_once() -> None: MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) - AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) - AsyncMessageStreamManager.__aenter__ = ( - _wrap_async_message_stream_manager_aenter( - AsyncMessageStreamManager.__aenter__ + AsyncMessageStream.__aiter__ = _wrap_async_message_stream_aiter( + AsyncMessageStream.__aiter__ + ) + AsyncMessageStream.__anext__ = _wrap_async_message_stream_anext( + AsyncMessageStream.__anext__ + ) + AsyncMessageStream.close = _wrap_async_message_stream_close( + AsyncMessageStream.close ) - ) def _capture_exception(exc: "Any") -> None: @@ -477,19 +501,14 @@ def _wrap_synchronous_message_iterator( async def _wrap_asynchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - + generator_exit = False try: async for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -508,41 +527,25 @@ async def _wrap_asynchronous_message_iterator( yield event continue - ( - model, - usage, - content_blocks, - response_id, - ) = _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - ) + _accumulate_event_data(stream, event) yield event + except ( + GeneratorExit + ): # https://docs.python.org/3/reference/expressions.html#generator.close + generator_exit = True + raise finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - ) + if not generator_exit and hasattr(stream, "_span"): + _finish_streaming_span( + stream._span, + stream._integration, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + ) + del stream._span def _set_output_data( @@ -625,17 +628,11 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs - if isinstance(result, Stream): + if isinstance(result, (Stream, AsyncStream)): result._span = span result._integration = integration return result - if isinstance(result, AsyncStream): - result._iterator = _wrap_asynchronous_message_iterator( - result._iterator, span, integration - ) - return result - with capture_internal_exceptions(): if hasattr(result, "content"): ( @@ -901,6 +898,94 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_async +def _wrap_async_stream_aiter( + f: "Callable[..., AsyncIterator[RawMessageStreamEvent]]", +) -> "Callable[..., AsyncIterator[RawMessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + async def __aiter__(self: "AsyncStream") -> "AsyncIterator[RawMessageStreamEvent]": + if not hasattr(self, "_span"): + async for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + async for event in _wrap_asynchronous_message_iterator( + self, + f(self), + ): + yield event + + return __aiter__ + + +def _wrap_async_stream_anext( + f: "Callable[..., Awaitable[RawMessageStreamEvent]]", +) -> "Callable[..., Awaitable[RawMessageStreamEvent]]": + """ + Accumulates output data from the returned event. + """ + + async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = await f(self) + except StopAsyncIteration: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) + + _accumulate_event_data(self, event) + return event + + return __anext__ + + +def _wrap_async_stream_close( + f: "Callable[..., Awaitable[None]]", +) -> "Callable[..., Awaitable[None]]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + async def close(self: "Stream") -> None: + if not hasattr(self, "_span"): + return await f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return await f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return await f(self) + + return close + + def _wrap_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. @@ -1138,17 +1223,104 @@ async def _sentry_patched_aenter( tools=self._tools, ) - stream._iterator = _wrap_asynchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, - ) + stream._span = span + stream._integration = integration return stream return _sentry_patched_aenter +def _wrap_async_message_stream_aiter( + f: "Callable[..., AsyncIterator[MessageStreamEvent]]", +) -> "Callable[..., AsyncIterator[MessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + async def __aiter__( + self: "AsyncMessageStream", + ) -> "AsyncIterator[MessageStreamEvent]": + if not hasattr(self, "_span"): + async for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + async for event in _wrap_asynchronous_message_iterator( + self, + f(self), + ): + yield event + + return __aiter__ + + +def _wrap_async_message_stream_anext( + f: "Callable[..., Awaitable[MessageStreamEvent]]", +) -> "Callable[..., Awaitable[MessageStreamEvent]]": + """ + Accumulates output data from the returned event. + """ + + async def __anext__(self: "AsyncMessageStream") -> "MessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = await f(self) + except StopAsyncIteration: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) + + _accumulate_event_data(self, event) + return event + + return __anext__ + + +def _wrap_async_message_stream_close( + f: "Callable[..., Awaitable[None]]", +) -> "Callable[..., Awaitable[None]]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + async def close(self: "AsyncMessageStream") -> None: + if not hasattr(self, "_span"): + return await f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return await f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return await f(self) + + return close + + def _is_given(obj: "Any") -> bool: """ Check for givenness safely across different anthropic versions. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index bf89037660..4818e47ed8 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -2,6 +2,7 @@ from unittest import mock import json from itertools import islice +from builtins import anext try: from unittest.mock import AsyncMock @@ -974,6 +975,218 @@ async def test_streaming_create_message_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +@pytest.mark.asyncio +async def test_streaming_create_message_async_next_consumption( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopAsyncIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + while True: + await anext(messages) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +@pytest.mark.asyncio +async def test_streaming_create_message_async_iterator_methods( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + await anext(messages) + await anext(messages) + + async for item in messages: + break + + await anext(messages) + await messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -1095,6 +1308,223 @@ async def test_stream_message_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +@pytest.mark.asyncio +async def test_stream_messages_async_next_consumption( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopAsyncIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + while True: + await anext(stream) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +@pytest.mark.asyncio +async def test_stream_messages_async_iterator_methods( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + await anext(stream) + await anext(stream) + + async for item in stream: + break + + await anext(stream) + # New versions add TextEvent, so consume one more event. + if TextEvent is not None and isinstance(await anext(stream), TextEvent): + await anext(stream) + await stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", From dd26abcaebd8e033915e1a2bb37472e19b6e2005 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 14:06:31 +0100 Subject: [PATCH 2/7] simplify --- sentry_sdk/integrations/anthropic.py | 15 +++++++----- .../integrations/anthropic/test_anthropic.py | 23 ++++++++++--------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index d49341d16b..71b6915efc 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -135,6 +135,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ ) + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if not issubclass(MessageStream, Stream): + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( _wrap_async_message_stream_manager_aenter( @@ -143,12 +150,8 @@ def setup_once() -> None: ) # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a - # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if not issubclass(MessageStream, Stream): - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) - MessageStream.close = _wrap_message_stream_close(MessageStream.close) - + # AsyncMessageStream inherits from AsyncStream, so patching Stream is sufficient on these versions. + if not issubclass(AsyncMessageStream, AsyncStream): AsyncMessageStream.__aiter__ = _wrap_async_message_stream_aiter( AsyncMessageStream.__aiter__ ) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 4818e47ed8..b250aefccf 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -2,7 +2,6 @@ from unittest import mock import json from itertools import islice -from builtins import anext try: from unittest.mock import AsyncMock @@ -1049,7 +1048,7 @@ async def test_streaming_create_message_async_next_consumption( ) while True: - await anext(messages) + await messages.__anext__() assert len(events) == 1 (event,) = events @@ -1151,13 +1150,13 @@ async def test_streaming_create_message_async_iterator_methods( max_tokens=1024, messages=messages, model="model", stream=True ) - await anext(messages) - await anext(messages) + await messages.__anext__() + await messages.__anext__() async for item in messages: break - await anext(messages) + await messages.__anext__() await messages.close() assert len(events) == 1 @@ -1383,7 +1382,7 @@ async def test_stream_messages_async_next_consumption( model="model", ) as stream: while True: - await anext(stream) + await stream.__anext__() assert len(events) == 1 (event,) = events @@ -1486,16 +1485,18 @@ async def test_stream_messages_async_iterator_methods( messages=messages, model="model", ) as stream: - await anext(stream) - await anext(stream) + await stream.__anext__() + await stream.__anext__() async for item in stream: break - await anext(stream) + await stream.__anext__() # New versions add TextEvent, so consume one more event. - if TextEvent is not None and isinstance(await anext(stream), TextEvent): - await anext(stream) + if TextEvent is not None and isinstance( + await stream.__anext__(), TextEvent + ): + await stream.__anext__() await stream.close() assert len(events) == 1 From c5cd95933f6e81c7923b75c18bd159d5d8c17f65 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:03:38 +0100 Subject: [PATCH 3/7] . --- sentry_sdk/integrations/anthropic.py | 2 ++ tests/integrations/anthropic/test_anthropic.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5a19127545..e447f6e393 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -962,6 +962,7 @@ async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": self._usage, self._content_blocks, self._response_id, + self._finish_reason, ) del self._span reraise(*exc_info) @@ -994,6 +995,7 @@ async def close(self: "Stream") -> None: self._usage, self._content_blocks, self._response_id, + self._finish_reason, ) del self._span diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 6038917a3b..faf7b4c262 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1083,6 +1083,7 @@ async def test_streaming_create_message_async_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.asyncio @@ -1192,7 +1193,6 @@ async def test_streaming_create_message_async_iterator_methods( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.asyncio @@ -1418,6 +1418,7 @@ async def test_stream_messages_async_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.asyncio @@ -1533,7 +1534,6 @@ async def test_stream_messages_async_iterator_methods( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.skipif( From 8e9bfabbd7211e505c39855eb666f444d19f9559 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:31:37 +0100 Subject: [PATCH 4/7] docstrings --- sentry_sdk/integrations/anthropic.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 3fcf9d4bed..8727578a0c 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -929,7 +929,8 @@ def _wrap_async_stream_aiter( ) -> "Callable[..., AsyncIterator[RawMessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ async def __aiter__(self: "AsyncStream") -> "AsyncIterator[RawMessageStreamEvent]": @@ -953,6 +954,7 @@ def _wrap_async_stream_anext( ) -> "Callable[..., Awaitable[RawMessageStreamEvent]]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": @@ -987,7 +989,8 @@ def _wrap_async_stream_close( f: "Callable[..., Awaitable[None]]", ) -> "Callable[..., Awaitable[None]]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ async def close(self: "Stream") -> None: @@ -1268,7 +1271,8 @@ def _wrap_async_message_stream_aiter( ) -> "Callable[..., AsyncIterator[MessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ async def __aiter__( @@ -1294,6 +1298,7 @@ def _wrap_async_message_stream_anext( ) -> "Callable[..., Awaitable[MessageStreamEvent]]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ async def __anext__(self: "AsyncMessageStream") -> "MessageStreamEvent": @@ -1328,7 +1333,8 @@ def _wrap_async_message_stream_close( f: "Callable[..., Awaitable[None]]", ) -> "Callable[..., Awaitable[None]]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ async def close(self: "AsyncMessageStream") -> None: From fab5d93b6503af639e7362ab0f62899ef21116e0 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:46:51 +0100 Subject: [PATCH 5/7] type annotation --- sentry_sdk/integrations/anthropic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 8727578a0c..9056b8dbd3 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -993,7 +993,7 @@ def _wrap_async_stream_close( the except block in the `__next__()` patch runs first. """ - async def close(self: "Stream") -> None: + async def close(self: "AsyncStream") -> None: if not hasattr(self, "_span"): return await f(self) From e17e0364090c01af3979cccd3027b727319ff9ae Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:57:04 +0100 Subject: [PATCH 6/7] review --- sentry_sdk/integrations/anthropic.py | 50 +++++++++++++--------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 7f7dbae534..4f54f18cf9 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -963,19 +963,17 @@ async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": except StopAsyncIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -998,6 +996,7 @@ async def close(self: "AsyncStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return await f(self) _finish_streaming_span( @@ -1306,19 +1305,17 @@ async def __anext__(self: "AsyncMessageStream") -> "MessageStreamEvent": except StopAsyncIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -1341,6 +1338,7 @@ async def close(self: "AsyncMessageStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return await f(self) _finish_streaming_span( From e2f7afc2cf38472318d1362273ace897b6b85cc5 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 16:03:40 +0100 Subject: [PATCH 7/7] . --- sentry_sdk/integrations/anthropic.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 59ff89a868..1dcef160de 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -498,7 +498,6 @@ async def _wrap_asynchronous_message_iterator( Sets information received while iterating the response stream on the AI Client Span. Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - generator_exit = False try: async for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -519,14 +518,9 @@ async def _wrap_asynchronous_message_iterator( _accumulate_event_data(stream, event) yield event - except ( - GeneratorExit - ): # https://docs.python.org/3/reference/expressions.html#generator.close - generator_exit = True - raise finally: with capture_internal_exceptions(): - if not generator_exit and hasattr(stream, "_span"): + if hasattr(stream, "_span"): _finish_streaming_span( span=stream._span, integration=stream._integration,