From eac253ade2b8f91883bf60f8345ae64fd0d00b5b Mon Sep 17 00:00:00 2001 From: Bernhard Czypka <130161325+czyber@users.noreply.github.com> Date: Thu, 2 May 2024 13:12:03 +0200 Subject: [PATCH] feat(integrations): Add Anthropic Integration (#2831) This PR adds an anthropic integration. It supports the creation of messages in streaming and non-streaming mode. --------- Co-authored-by: Anton Pirker --- .../test-integrations-data-processing.yml | 8 + mypy.ini | 2 + .../split-tox-gh-actions.py | 1 + sentry_sdk/consts.py | 1 + sentry_sdk/integrations/anthropic.py | 170 ++++++++++++++ setup.py | 1 + tests/integrations/anthropic/__init__.py | 3 + .../integrations/anthropic/test_anthropic.py | 210 ++++++++++++++++++ tox.ini | 10 + 9 files changed, 406 insertions(+) create mode 100644 sentry_sdk/integrations/anthropic.py create mode 100644 tests/integrations/anthropic/__init__.py create mode 100644 tests/integrations/anthropic/test_anthropic.py diff --git a/.github/workflows/test-integrations-data-processing.yml b/.github/workflows/test-integrations-data-processing.yml index 1f618bd93d..28c788d69a 100644 --- a/.github/workflows/test-integrations-data-processing.yml +++ b/.github/workflows/test-integrations-data-processing.yml @@ -42,6 +42,10 @@ jobs: - name: Erase coverage run: | coverage erase + - name: Test anthropic latest + run: | + set -x # print commands that are executed + ./scripts/runtox.sh "py${{ matrix.python-version }}-anthropic-latest" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch - name: Test arq latest run: | set -x # print commands that are executed @@ -102,6 +106,10 @@ jobs: - name: Erase coverage run: | coverage erase + - name: Test anthropic pinned + run: | + set -x # print commands that are executed + ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-anthropic" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch - name: Test arq pinned run: | set -x # print commands that are executed diff --git a/mypy.ini b/mypy.ini index 844e140de2..0d8a60b64c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -36,6 +36,8 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-aiohttp.*] ignore_missing_imports = True +[mypy-anthropic.*] +ignore_missing_imports = True [mypy-sanic.*] ignore_missing_imports = True [mypy-tornado.*] diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index 288725d2c5..53fa55d909 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -66,6 +66,7 @@ "gcp", ], "Data Processing": [ + "anthropic", "arq", "beam", "celery", diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 19595ed7fa..3ffa384e04 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -296,6 +296,7 @@ class SPANDATA: class OP: + ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic" CACHE_GET_ITEM = "cache.get_item" DB = "db" DB_REDIS = "db.redis" diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py new file mode 100644 index 0000000000..9d43093ac4 --- /dev/null +++ b/sentry_sdk/integrations/anthropic.py @@ -0,0 +1,170 @@ +from functools import wraps + +import sentry_sdk +from sentry_sdk.ai.monitoring import record_token_usage +from sentry_sdk.consts import OP, SPANDATA +from sentry_sdk.integrations import DidNotEnable, Integration +from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.utils import ( + capture_internal_exceptions, + ensure_integration_enabled, + event_from_exception, + package_version, +) + +from anthropic.resources import Messages + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any, Iterator + from anthropic.types import MessageStreamEvent + from sentry_sdk.tracing import Span + + +class AnthropicIntegration(Integration): + identifier = "anthropic" + + def __init__(self, include_prompts=True): + # type: (AnthropicIntegration, bool) -> None + self.include_prompts = include_prompts + + @staticmethod + def setup_once(): + # type: () -> None + version = package_version("anthropic") + + if version is None: + raise DidNotEnable("Unparsable anthropic version.") + + if version < (0, 16): + raise DidNotEnable("anthropic 0.16 or newer required.") + + Messages.create = _wrap_message_create(Messages.create) + + +def _capture_exception(exc): + # type: (Any) -> None + event, hint = event_from_exception( + exc, + client_options=sentry_sdk.get_client().options, + mechanism={"type": "anthropic", "handled": False}, + ) + sentry_sdk.capture_event(event, hint=hint) + + +def _calculate_token_usage(result, span): + # type: (Messages, Span) -> None + input_tokens = 0 + output_tokens = 0 + if hasattr(result, "usage"): + usage = result.usage + if hasattr(usage, "input_tokens") and isinstance(usage.input_tokens, int): + input_tokens = usage.input_tokens + if hasattr(usage, "output_tokens") and isinstance(usage.output_tokens, int): + output_tokens = usage.output_tokens + + total_tokens = input_tokens + output_tokens + record_token_usage(span, input_tokens, output_tokens, total_tokens) + + +def _wrap_message_create(f): + # type: (Any) -> Any + @wraps(f) + @ensure_integration_enabled(AnthropicIntegration, f) + def _sentry_patched_create(*args, **kwargs): + # type: (*Any, **Any) -> Any + if "messages" not in kwargs: + return f(*args, **kwargs) + + try: + iter(kwargs["messages"]) + except TypeError: + return f(*args, **kwargs) + + messages = list(kwargs["messages"]) + model = kwargs.get("model") + + span = sentry_sdk.start_span( + op=OP.ANTHROPIC_MESSAGES_CREATE, description="Anthropic messages create" + ) + span.__enter__() + + try: + result = f(*args, **kwargs) + except Exception as exc: + _capture_exception(exc) + span.__exit__(None, None, None) + raise exc from None + + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + + with capture_internal_exceptions(): + span.set_data(SPANDATA.AI_MODEL_ID, model) + span.set_data(SPANDATA.AI_STREAMING, False) + if should_send_default_pii() and integration.include_prompts: + span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) + if hasattr(result, "content"): + if should_send_default_pii() and integration.include_prompts: + span.set_data( + SPANDATA.AI_RESPONSES, + list( + map( + lambda message: { + "type": message.type, + "text": message.text, + }, + result.content, + ) + ), + ) + _calculate_token_usage(result, span) + span.__exit__(None, None, None) + elif hasattr(result, "_iterator"): + old_iterator = result._iterator + + def new_iterator(): + # type: () -> Iterator[MessageStreamEvent] + input_tokens = 0 + output_tokens = 0 + content_blocks = [] + with capture_internal_exceptions(): + for event in old_iterator: + if hasattr(event, "type"): + if event.type == "message_start": + usage = event.message.usage + input_tokens += usage.input_tokens + output_tokens += usage.output_tokens + elif event.type == "content_block_start": + pass + elif event.type == "content_block_delta": + content_blocks.append(event.delta.text) + elif event.type == "content_block_stop": + pass + elif event.type == "message_delta": + output_tokens += event.usage.output_tokens + elif event.type == "message_stop": + continue + yield event + + if should_send_default_pii() and integration.include_prompts: + complete_message = "".join(content_blocks) + span.set_data( + SPANDATA.AI_RESPONSES, + [{"type": "text", "text": complete_message}], + ) + total_tokens = input_tokens + output_tokens + record_token_usage( + span, input_tokens, output_tokens, total_tokens + ) + span.set_data(SPANDATA.AI_STREAMING, True) + span.__exit__(None, None, None) + + result._iterator = new_iterator() + else: + span.set_data("unknown_response", True) + span.__exit__(None, None, None) + + return result + + return _sentry_patched_create diff --git a/setup.py b/setup.py index bef9842119..e10fe624e1 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ def get_file_text(file_name): ], extras_require={ "aiohttp": ["aiohttp>=3.5"], + "anthropic": ["anthropic>=0.16"], "arq": ["arq>=0.23"], "asyncpg": ["asyncpg>=0.23"], "beam": ["apache-beam>=2.12"], diff --git a/tests/integrations/anthropic/__init__.py b/tests/integrations/anthropic/__init__.py new file mode 100644 index 0000000000..29ac4e6ff4 --- /dev/null +++ b/tests/integrations/anthropic/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("anthropic") diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py new file mode 100644 index 0000000000..10424771b6 --- /dev/null +++ b/tests/integrations/anthropic/test_anthropic.py @@ -0,0 +1,210 @@ +import pytest +from unittest import mock +from anthropic import Anthropic, Stream, AnthropicError +from anthropic.types import Usage, ContentBlock, MessageDeltaUsage, TextDelta +from anthropic.types.message import Message +from anthropic.types.message_start_event import MessageStartEvent +from anthropic.types.content_block_start_event import ContentBlockStartEvent +from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent +from anthropic.types.content_block_stop_event import ContentBlockStopEvent +from anthropic.types.message_delta_event import MessageDeltaEvent, Delta + +from sentry_sdk import start_transaction +from sentry_sdk.consts import OP, SPANDATA +from sentry_sdk.integrations.anthropic import AnthropicIntegration + + +EXAMPLE_MESSAGE = Message( + id="id", + model="model", + role="assistant", + content=[ContentBlock(type="text", text="Hi, I'm Claude.")], + type="message", + usage=Usage(input_tokens=10, output_tokens=20), +) + + +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_nonstreaming_create_message( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client = Anthropic(api_key="z") + client.messages._post = mock.Mock(return_value=EXAMPLE_MESSAGE) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + response = client.messages.create( + max_tokens=1024, messages=messages, model="model" + ) + + assert response == EXAMPLE_MESSAGE + usage = response.usage + + assert usage.input_tokens == 10 + assert usage.output_tokens == 20 + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE + assert span["description"] == "Anthropic messages create" + assert span["data"][SPANDATA.AI_MODEL_ID] == "model" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages + assert span["data"][SPANDATA.AI_RESPONSES] == [ + {"type": "text", "text": "Hi, I'm Claude."} + ] + else: + assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.AI_RESPONSES not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 10 + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 20 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 + assert span["data"]["ai.streaming"] is False + + +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_streaming_create_message( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = Anthropic(api_key="z") + returned_stream = Stream(cast_to=None, response=None, client=client) + returned_stream._iterator = [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ContentBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = mock.Mock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + message = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + for _ in message: + pass + + assert message == returned_stream + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE + assert span["description"] == "Anthropic messages create" + assert span["data"][SPANDATA.AI_MODEL_ID] == "model" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages + assert span["data"][SPANDATA.AI_RESPONSES] == [ + {"type": "text", "text": "Hi! I'm Claude!"} + ] + + else: + assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.AI_RESPONSES not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 10 + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 30 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 40 + assert span["data"]["ai.streaming"] is True + + +def test_exception_message_create(sentry_init, capture_events): + sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) + events = capture_events() + + client = Anthropic(api_key="z") + client.messages._post = mock.Mock( + side_effect=AnthropicError("API rate limit reached") + ) + with pytest.raises(AnthropicError): + client.messages.create( + model="some-model", + messages=[{"role": "system", "content": "I'm throwing an exception"}], + max_tokens=1024, + ) + + (event,) = events + assert event["level"] == "error" diff --git a/tox.ini b/tox.ini index e373589736..47651c0faf 100644 --- a/tox.ini +++ b/tox.ini @@ -29,6 +29,10 @@ envlist = {py3.7,py3.9,py3.11}-aiohttp-v{3.8} {py3.8,py3.11}-aiohttp-latest + # Anthropic + {py3.7,py3.11,py3.12}-anthropic-v{0.16,0.25} + {py3.7,py3.11,py3.12}-anthropic-latest + # Ariadne {py3.8,py3.11}-ariadne-v{0.20} {py3.8,py3.11,py3.12}-ariadne-latest @@ -271,6 +275,11 @@ deps = aiohttp-v3.8: pytest-asyncio aiohttp-latest: pytest-asyncio + # Anthropic + anthropic-v0.25: anthropic~=0.25.0 + anthropic-v0.16: anthropic~=0.16.0 + anthropic-latest: anthropic + # Ariadne ariadne-v0.20: ariadne~=0.20.0 ariadne-latest: ariadne @@ -591,6 +600,7 @@ setenv = common: TESTPATH=tests gevent: TESTPATH=tests aiohttp: TESTPATH=tests/integrations/aiohttp + anthropic: TESTPATH=tests/integrations/anthropic ariadne: TESTPATH=tests/integrations/ariadne arq: TESTPATH=tests/integrations/arq asgi: TESTPATH=tests/integrations/asgi