From e3cc76d4516c8d4945d92c9b5f8846f69a36b3cf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 11:24:31 +0100 Subject: [PATCH 01/11] fix(anthropic): Respect iterator protocol in synchronous streamed responses --- sentry_sdk/integrations/anthropic.py | 334 +++++++++++++-- .../integrations/anthropic/test_anthropic.py | 405 ++++++++++++++++++ 2 files changed, 692 insertions(+), 47 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index bc208ac4f5..efb8638642 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -39,7 +39,11 @@ from anthropic import Stream, AsyncStream from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager + from anthropic.lib.streaming import ( + MessageStreamManager, + MessageStream, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -56,7 +60,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union + from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -67,7 +71,7 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import MessageStream, AsyncMessageStream + from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -89,13 +93,36 @@ def setup_once() -> None: version = package_version("anthropic") _check_minimum_version(AnthropicIntegration, version) + """ + client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The close() method is patched for situations in which the method is directly invoked by the user, and otherwise + the finally block in the __iter__ patch closes the span. + """ Messages.create = _wrap_message_create(Messages.create) + Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) + Stream.__next__ = _wrap_stream_next(Stream.__next__) + Stream.close = _wrap_stream_close(Stream.close) + AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + """ + client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The close() method is patched for situations in which the method is directly invoked by the user, and otherwise + the finally block in the __iter__ patch closes the span. + """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -398,20 +425,14 @@ def _set_create_input_data( def _wrap_synchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - + generator_exit = False try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -430,36 +451,25 @@ def _wrap_synchronous_message_iterator( yield event continue - (model, usage, content_blocks, response_id) = _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - ) + _accumulate_event_data(stream, event) yield event + except ( + GeneratorExit + ): # https://docs.python.org/3/reference/expressions.html#generator.close + generator_exit = True + raise finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - ) + if not generator_exit and hasattr(stream, "_span"): + _finish_streaming_span( + stream._span, + stream._integration, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + ) + del stream._span async def _wrap_asynchronous_message_iterator( @@ -612,9 +622,8 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs if isinstance(result, Stream): - result._iterator = _wrap_synchronous_message_iterator( - result._iterator, span, integration - ) + result._span = span + result._integration = integration return result if isinstance(result, AsyncStream): @@ -698,6 +707,155 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]"): + """ + Initialize fields for accumulating output on the Stream instance. + """ + if not hasattr(stream, "_model"): + stream._model = None + stream._usage = _RecordedUsage() + stream._content_blocks: "list[str]" = [] + stream._response_id = None + + +def _accumulate_event_data( + self, event: "Union[RawMessageStreamEvent, MessageStreamEvent]" +): + """ + Update accumulated output from a single stream event. + """ + (model, usage, content_blocks, response_id) = _collect_ai_data( + event, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + + self._model = model + self._usage = usage + self._content_blocks = content_blocks + self._response_id = response_id + + +def _finish_streaming_span( + span: "Span", + integration: "AnthropicIntegration", + model: "Optional[str]", + usage: "_RecordedUsage", + content_blocks: "list[str]", + response_id: "Optional[str]", +): + """ + Set output attributes on the AI Client Span and end the span. + """ + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + response_id=response_id, + ) + + +def _wrap_stream_iter( + f: "Callable[..., Iterator[RawMessageStreamEvent]]", +) -> "Callable[..., Iterator[RawMessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + def __iter__(self) -> "Iterator[RawMessageStreamEvent]": + if not hasattr(self, "_span"): + for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_stream_next( + f: "Callable[..., RawMessageStreamEvent]", +) -> "Callable[..., RawMessageStreamEvent]": + """ + Accumulates output data from the returned event. + """ + + def __next__(self) -> "RawMessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + raise + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self) -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return f(self) + + return close + + def _wrap_message_create_async(f: "Any") -> "Any": async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": gen = _sentry_patched_create_common(f, *args, **kwargs) @@ -805,17 +963,99 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": tools=self._tools, ) - stream._iterator = _wrap_synchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, - ) + stream._span = span + stream._integration = integration return stream return _sentry_patched_enter +def _wrap_message_stream_iter( + f: "Callable[..., Iterator[MessageStreamEvent]]", +) -> "Callable[..., Iterator[MessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + def __iter__(self) -> "Iterator[MessageStreamEvent]": + if not hasattr(self, "_span"): + for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_message_stream_next( + f: "Callable[..., MessageStreamEvent]", +) -> "Callable[..., MessageStreamEvent]": + """ + Accumulates output data from the returned event. + """ + + def __next__(self) -> "MessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + raise + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_message_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self) -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return f(self) + + return close + + def _wrap_async_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 3a854e3a4e..e3cde11b62 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,6 +1,7 @@ import pytest from unittest import mock import json +from itertools import islice try: from unittest.mock import AsyncMock @@ -325,6 +326,207 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +def test_streaming_create_message_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + while True: + next(messages) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +def test_streaming_create_message_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + next(messages) + next(messages) + list(islice(messages, 1)) + next(messages) + messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -441,6 +643,209 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +def test_stream_messages_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + while True: + next(stream) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +def test_stream_messages_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + next(stream) + next(stream) + list(islice(stream, 2)) + next(stream) + stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", From 7bd7ed42c9efb24eb94ae07ef4b9f76c8c44db44 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:00:50 +0100 Subject: [PATCH 02/11] version check and typing --- sentry_sdk/integrations/anthropic.py | 78 ++++++++++--------- .../integrations/anthropic/test_anthropic.py | 7 +- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index efb8638642..5a0b511708 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -120,9 +120,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) - MessageStream.close = _wrap_message_stream_close(MessageStream.close) + + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if version >= (0, 26, 2): + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -779,7 +783,7 @@ def _wrap_stream_iter( output attributes on the AI Client Span and end the span. """ - def __iter__(self) -> "Iterator[RawMessageStreamEvent]": + def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": if not hasattr(self, "_span"): for event in f(self): yield event @@ -801,24 +805,26 @@ def _wrap_stream_next( Accumulates output data from the returned event. """ - def __next__(self) -> "RawMessageStreamEvent": + def __next__(self: "Stream") -> "RawMessageStreamEvent": _initialize_data_accumulation_state(self) try: event = f(self) except StopIteration: - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - ) - del self._span - raise + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) _accumulate_event_data(self, event) return event @@ -833,7 +839,7 @@ def _wrap_stream_close( Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self) -> None: + def close(self: "Stream") -> None: if not hasattr(self, "_span"): return f(self) @@ -979,7 +985,7 @@ def _wrap_message_stream_iter( output attributes on the AI Client Span and end the span. """ - def __iter__(self) -> "Iterator[MessageStreamEvent]": + def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": if not hasattr(self, "_span"): for event in f(self): yield event @@ -1001,24 +1007,26 @@ def _wrap_message_stream_next( Accumulates output data from the returned event. """ - def __next__(self) -> "MessageStreamEvent": + def __next__(self: "MessageStream") -> "MessageStreamEvent": _initialize_data_accumulation_state(self) try: event = f(self) except StopIteration: - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - ) - del self._span - raise + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) _accumulate_event_data(self, event) return event @@ -1033,7 +1041,7 @@ def _wrap_message_stream_close( Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self) -> None: + def close(self: "MessageStream") -> None: if not hasattr(self, "_span"): return f(self) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index e3cde11b62..49c32abf75 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -13,6 +13,7 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic +from anthropic.lib.streaming import TextEvent from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -815,8 +816,10 @@ def test_stream_messages_iterator_methods( ) as stream: next(stream) next(stream) - list(islice(stream, 2)) - next(stream) + list(islice(stream, 1)) + # New versions add TextEvent, so consume one more event. + if isinstance(next(stream), TextEvent): + next(stream) stream.close() assert len(events) == 1 From c8a6ec306f1d413295b9dbbd213317fcc7c1241e Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:03:40 +0100 Subject: [PATCH 03/11] fix tests --- tests/integrations/anthropic/test_anthropic.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 49c32abf75..d9d5c0c30e 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -13,7 +13,6 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic -from anthropic.lib.streaming import TextEvent from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -30,6 +29,11 @@ async def __call__(self, *args, **kwargs): except ImportError: pass +try: + from anthropic.lib.streaming import TextEvent +except ImportError: + TextEvent = None + try: # 0.27+ from anthropic.types.raw_message_delta_event import Delta @@ -818,7 +822,7 @@ def test_stream_messages_iterator_methods( next(stream) list(islice(stream, 1)) # New versions add TextEvent, so consume one more event. - if isinstance(next(stream), TextEvent): + if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) stream.close() From 6f1d833b6d290bd413cee40d4afe55981615a2cc Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:07:01 +0100 Subject: [PATCH 04/11] missing types --- sentry_sdk/integrations/anthropic.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5a0b511708..27acdafbe4 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -711,35 +711,36 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync -def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]"): +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None: """ Initialize fields for accumulating output on the Stream instance. """ if not hasattr(stream, "_model"): stream._model = None stream._usage = _RecordedUsage() - stream._content_blocks: "list[str]" = [] + stream._content_blocks = [] stream._response_id = None def _accumulate_event_data( - self, event: "Union[RawMessageStreamEvent, MessageStreamEvent]" -): + stream: "Union[Stream, MessageStream]", + event: "Union[RawMessageStreamEvent, MessageStreamEvent]", +) -> None: """ Update accumulated output from a single stream event. """ (model, usage, content_blocks, response_id) = _collect_ai_data( event, - self._model, - self._usage, - self._content_blocks, - self._response_id, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, ) - self._model = model - self._usage = usage - self._content_blocks = content_blocks - self._response_id = response_id + stream._model = model + stream._usage = usage + stream._content_blocks = content_blocks + stream._response_id = response_id def _finish_streaming_span( @@ -749,7 +750,7 @@ def _finish_streaming_span( usage: "_RecordedUsage", content_blocks: "list[str]", response_id: "Optional[str]", -): +) -> None: """ Set output attributes on the AI Client Span and end the span. """ From 40e2bf029dd609bb9b1ded5acb7ba78b052001cf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 13:02:19 +0100 Subject: [PATCH 05/11] . --- sentry_sdk/integrations/anthropic.py | 2 +- tests/integrations/anthropic/test_anthropic.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 27acdafbe4..98d4b50ed9 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -123,7 +123,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if version >= (0, 26, 2): + if version is not None and version >= (0, 26, 2): MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index d9d5c0c30e..bf89037660 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -821,6 +821,7 @@ def test_stream_messages_iterator_methods( next(stream) next(stream) list(islice(stream, 1)) + next(stream) # New versions add TextEvent, so consume one more event. if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) From a3cc18ff19c59bd708abce2cd715b8bb59e0f2bd Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 14:03:45 +0100 Subject: [PATCH 06/11] simplify --- sentry_sdk/integrations/anthropic.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 98d4b50ed9..4da1752895 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -123,7 +123,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if version is not None and version >= (0, 26, 2): + if not issubclass(MessageStream, Stream): MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) @@ -786,8 +786,7 @@ def _wrap_stream_iter( def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": if not hasattr(self, "_span"): - for event in f(self): - yield event + yield from f(self) return _initialize_data_accumulation_state(self) @@ -988,8 +987,7 @@ def _wrap_message_stream_iter( def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": if not hasattr(self, "_span"): - for event in f(self): - yield event + yield from f(self) return _initialize_data_accumulation_state(self) From 0aeec7262f09978d76cce84c7d9d5ee649b63a95 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 12:56:47 +0100 Subject: [PATCH 07/11] update tests --- tests/integrations/anthropic/test_anthropic.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 487c69e208..4fd3c48228 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -371,7 +371,7 @@ def test_streaming_create_message_next_consumption( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -431,6 +431,7 @@ def test_streaming_create_message_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] def test_streaming_create_message_iterator_methods( @@ -470,7 +471,7 @@ def test_streaming_create_message_iterator_methods( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -689,7 +690,7 @@ def test_stream_messages_next_consumption( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -750,6 +751,7 @@ def test_stream_messages_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] def test_stream_messages_iterator_methods( @@ -789,7 +791,7 @@ def test_stream_messages_iterator_methods( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), From b92db6df0e143f8b1b57c383852ed81dfb80a496 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:15:04 +0100 Subject: [PATCH 08/11] docstring --- sentry_sdk/integrations/anthropic.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 575ff26fd4..ed87a8a530 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -94,12 +94,15 @@ def setup_once() -> None: _check_minimum_version(AnthropicIntegration, version) """ - client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept streamed events. The streamed events are used to populate output attributes on the AI Client Span. - The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. """ Messages.create = _wrap_message_create(Messages.create) Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) @@ -109,12 +112,15 @@ def setup_once() -> None: AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept streamed events. The streamed events are used to populate output attributes on the AI Client Span. - The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( From beb8f2c792b7b1108807e7d08d5b6fc080977178 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:26:45 +0100 Subject: [PATCH 09/11] docstrings --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index ed87a8a530..4cbaba0f4a 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -802,7 +802,8 @@ def _wrap_stream_iter( ) -> "Callable[..., Iterator[RawMessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": @@ -824,6 +825,7 @@ def _wrap_stream_next( ) -> "Callable[..., RawMessageStreamEvent]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ def __next__(self: "Stream") -> "RawMessageStreamEvent": @@ -858,7 +860,8 @@ def _wrap_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ def close(self: "Stream") -> None: From 7ec95fe46b97728d62717b4a9d84503e1fc3a6a1 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:28:38 +0100 Subject: [PATCH 10/11] docs --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 4cbaba0f4a..bdad51adef 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -1008,7 +1008,8 @@ def _wrap_message_stream_iter( ) -> "Callable[..., Iterator[MessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": @@ -1030,6 +1031,7 @@ def _wrap_message_stream_next( ) -> "Callable[..., MessageStreamEvent]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ def __next__(self: "MessageStream") -> "MessageStreamEvent": @@ -1064,7 +1066,8 @@ def _wrap_message_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ def close(self: "MessageStream") -> None: From cda41e20c60ed25fc4a9fc99d62c90531c50ace6 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:53:52 +0100 Subject: [PATCH 11/11] review --- sentry_sdk/integrations/anthropic.py | 50 +++++++++++++--------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index bdad51adef..40afd82d3d 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -835,19 +835,17 @@ def __next__(self: "Stream") -> "RawMessageStreamEvent": except StopIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -870,6 +868,7 @@ def close(self: "Stream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return f(self) _finish_streaming_span( @@ -1041,19 +1040,17 @@ def __next__(self: "MessageStream") -> "MessageStreamEvent": except StopIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -1076,6 +1073,7 @@ def close(self: "MessageStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return f(self) _finish_streaming_span(