diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 32522a7234..40afd82d3d 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -39,7 +39,11 @@ from anthropic import Stream, AsyncStream from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager + from anthropic.lib.streaming import ( + MessageStreamManager, + MessageStream, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -56,7 +60,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union + from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -67,7 +71,7 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import MessageStream, AsyncMessageStream + from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -89,14 +93,47 @@ def setup_once() -> None: version = package_version("anthropic") _check_minimum_version(AnthropicIntegration, version) + """ + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. + """ Messages.create = _wrap_message_create(Messages.create) + Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) + Stream.__next__ = _wrap_stream_next(Stream.__next__) + Stream.close = _wrap_stream_close(Stream.close) + AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + """ + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. + """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if not issubclass(MessageStream, Stream): + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( _wrap_async_message_stream_manager_aenter( @@ -399,21 +436,14 @@ def _set_create_input_data( def _wrap_synchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - finish_reason = None - + generator_exit = False try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -432,40 +462,26 @@ def _wrap_synchronous_message_iterator( yield event continue - (model, usage, content_blocks, response_id, finish_reason) = ( - _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - finish_reason, - ) - ) + _accumulate_event_data(stream, event) yield event + except ( + GeneratorExit + ): # https://docs.python.org/3/reference/expressions.html#generator.close + generator_exit = True + raise finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - finish_reason=finish_reason, - ) + if not generator_exit and hasattr(stream, "_span"): + _finish_streaming_span( + stream._span, + stream._integration, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + stream._finish_reason, + ) + del stream._span async def _wrap_asynchronous_message_iterator( @@ -625,9 +641,8 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs if isinstance(result, Stream): - result._iterator = _wrap_synchronous_message_iterator( - result._iterator, span, integration - ) + result._span = span + result._integration = integration return result if isinstance(result, AsyncStream): @@ -712,6 +727,166 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None: + """ + Initialize fields for accumulating output on the Stream instance. + """ + if not hasattr(stream, "_model"): + stream._model = None + stream._usage = _RecordedUsage() + stream._content_blocks = [] + stream._response_id = None + stream._finish_reason = None + + +def _accumulate_event_data( + stream: "Union[Stream, MessageStream]", + event: "Union[RawMessageStreamEvent, MessageStreamEvent]", +) -> None: + """ + Update accumulated output from a single stream event. + """ + (model, usage, content_blocks, response_id, finish_reason) = _collect_ai_data( + event, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + stream._finish_reason, + ) + + stream._model = model + stream._usage = usage + stream._content_blocks = content_blocks + stream._response_id = response_id + stream._finish_reason = finish_reason + + +def _finish_streaming_span( + span: "Span", + integration: "AnthropicIntegration", + model: "Optional[str]", + usage: "_RecordedUsage", + content_blocks: "list[str]", + response_id: "Optional[str]", + finish_reason: "Optional[str]", +) -> None: + """ + Set output attributes on the AI Client Span and end the span. + """ + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + response_id=response_id, + finish_reason=finish_reason, + ) + + +def _wrap_stream_iter( + f: "Callable[..., Iterator[RawMessageStreamEvent]]", +) -> "Callable[..., Iterator[RawMessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). + """ + + def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": + if not hasattr(self, "_span"): + yield from f(self) + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_stream_next( + f: "Callable[..., RawMessageStreamEvent]", +) -> "Callable[..., RawMessageStreamEvent]": + """ + Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. + """ + + def __next__(self: "Stream") -> "RawMessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span + reraise(*exc_info) + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. + """ + + def close(self: "Stream") -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + del self._span + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span + + return f(self) + + return close + + def _wrap_message_create_async(f: "Any") -> "Any": async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": gen = _sentry_patched_create_common(f, *args, **kwargs) @@ -819,17 +994,104 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": tools=self._tools, ) - stream._iterator = _wrap_synchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, - ) + stream._span = span + stream._integration = integration return stream return _sentry_patched_enter +def _wrap_message_stream_iter( + f: "Callable[..., Iterator[MessageStreamEvent]]", +) -> "Callable[..., Iterator[MessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). + """ + + def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": + if not hasattr(self, "_span"): + yield from f(self) + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_message_stream_next( + f: "Callable[..., MessageStreamEvent]", +) -> "Callable[..., MessageStreamEvent]": + """ + Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. + """ + + def __next__(self: "MessageStream") -> "MessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span + reraise(*exc_info) + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_message_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. + """ + + def close(self: "MessageStream") -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + del self._span + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span + + return f(self) + + return close + + def _wrap_async_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 8b83d2d128..4fd3c48228 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,6 +1,7 @@ import pytest from unittest import mock import json +from itertools import islice try: from unittest.mock import AsyncMock @@ -28,6 +29,11 @@ async def __call__(self, *args, **kwargs): except ImportError: pass +try: + from anthropic.lib.streaming import TextEvent +except ImportError: + TextEvent = None + try: # 0.27+ from anthropic.types.raw_message_delta_event import Delta @@ -328,6 +334,208 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] +def test_streaming_create_message_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="max_tokens"), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + while True: + next(messages) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] + + +def test_streaming_create_message_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="max_tokens"), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + next(messages) + next(messages) + list(islice(messages, 1)) + next(messages) + messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -445,6 +653,213 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] +def test_stream_messages_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="max_tokens"), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + while True: + next(stream) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] + + +def test_stream_messages_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="max_tokens"), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + next(stream) + next(stream) + list(islice(stream, 1)) + next(stream) + # New versions add TextEvent, so consume one more event. + if TextEvent is not None and isinstance(next(stream), TextEvent): + next(stream) + stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts",