Skip to content

Commit

Permalink
feat(integrations): Add Anthropic Integration (#2831)
Browse files Browse the repository at this point in the history
This PR adds an anthropic integration. It supports the creation of messages in streaming and non-streaming mode.

---------

Co-authored-by: Anton Pirker <anton.pirker@sentry.io>
  • Loading branch information
czyber and antonpirker committed May 2, 2024
1 parent f98f77f commit eac253a
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Expand Up @@ -42,6 +42,10 @@ jobs:
- name: Erase coverage
run: |
coverage erase
- name: Test anthropic latest
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-anthropic-latest" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
- name: Test arq latest
run: |
set -x # print commands that are executed
Expand Down Expand Up @@ -102,6 +106,10 @@ jobs:
- name: Erase coverage
run: |
coverage erase
- name: Test anthropic pinned
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-anthropic" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
- name: Test arq pinned
run: |
set -x # print commands that are executed
Expand Down
2 changes: 2 additions & 0 deletions mypy.ini
Expand Up @@ -36,6 +36,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-aiohttp.*]
ignore_missing_imports = True
[mypy-anthropic.*]
ignore_missing_imports = True
[mypy-sanic.*]
ignore_missing_imports = True
[mypy-tornado.*]
Expand Down
1 change: 1 addition & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Expand Up @@ -66,6 +66,7 @@
"gcp",
],
"Data Processing": [
"anthropic",
"arq",
"beam",
"celery",
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Expand Up @@ -296,6 +296,7 @@ class SPANDATA:


class OP:
ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic"
CACHE_GET_ITEM = "cache.get_item"
DB = "db"
DB_REDIS = "db.redis"
Expand Down
170 changes: 170 additions & 0 deletions sentry_sdk/integrations/anthropic.py
@@ -0,0 +1,170 @@
from functools import wraps

import sentry_sdk
from sentry_sdk.ai.monitoring import record_token_usage
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
package_version,
)

from anthropic.resources import Messages

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Iterator
from anthropic.types import MessageStreamEvent
from sentry_sdk.tracing import Span


class AnthropicIntegration(Integration):
identifier = "anthropic"

def __init__(self, include_prompts=True):
# type: (AnthropicIntegration, bool) -> None
self.include_prompts = include_prompts

@staticmethod
def setup_once():
# type: () -> None
version = package_version("anthropic")

if version is None:
raise DidNotEnable("Unparsable anthropic version.")

if version < (0, 16):
raise DidNotEnable("anthropic 0.16 or newer required.")

Messages.create = _wrap_message_create(Messages.create)


def _capture_exception(exc):
# type: (Any) -> None
event, hint = event_from_exception(
exc,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "anthropic", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)


def _calculate_token_usage(result, span):
# type: (Messages, Span) -> None
input_tokens = 0
output_tokens = 0
if hasattr(result, "usage"):
usage = result.usage
if hasattr(usage, "input_tokens") and isinstance(usage.input_tokens, int):
input_tokens = usage.input_tokens
if hasattr(usage, "output_tokens") and isinstance(usage.output_tokens, int):
output_tokens = usage.output_tokens

total_tokens = input_tokens + output_tokens
record_token_usage(span, input_tokens, output_tokens, total_tokens)


def _wrap_message_create(f):
# type: (Any) -> Any
@wraps(f)
@ensure_integration_enabled(AnthropicIntegration, f)
def _sentry_patched_create(*args, **kwargs):
# type: (*Any, **Any) -> Any
if "messages" not in kwargs:
return f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return f(*args, **kwargs)

messages = list(kwargs["messages"])
model = kwargs.get("model")

span = sentry_sdk.start_span(
op=OP.ANTHROPIC_MESSAGES_CREATE, description="Anthropic messages create"
)
span.__enter__()

try:
result = f(*args, **kwargs)
except Exception as exc:
_capture_exception(exc)
span.__exit__(None, None, None)
raise exc from None

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

with capture_internal_exceptions():
span.set_data(SPANDATA.AI_MODEL_ID, model)
span.set_data(SPANDATA.AI_STREAMING, False)
if should_send_default_pii() and integration.include_prompts:
span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages)
if hasattr(result, "content"):
if should_send_default_pii() and integration.include_prompts:
span.set_data(
SPANDATA.AI_RESPONSES,
list(
map(
lambda message: {
"type": message.type,
"text": message.text,
},
result.content,
)
),
)
_calculate_token_usage(result, span)
span.__exit__(None, None, None)
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator():
# type: () -> Iterator[MessageStreamEvent]
input_tokens = 0
output_tokens = 0
content_blocks = []
with capture_internal_exceptions():
for event in old_iterator:
if hasattr(event, "type"):
if event.type == "message_start":
usage = event.message.usage
input_tokens += usage.input_tokens
output_tokens += usage.output_tokens
elif event.type == "content_block_start":
pass
elif event.type == "content_block_delta":
content_blocks.append(event.delta.text)
elif event.type == "content_block_stop":
pass
elif event.type == "message_delta":
output_tokens += event.usage.output_tokens
elif event.type == "message_stop":
continue
yield event

if should_send_default_pii() and integration.include_prompts:
complete_message = "".join(content_blocks)
span.set_data(
SPANDATA.AI_RESPONSES,
[{"type": "text", "text": complete_message}],
)
total_tokens = input_tokens + output_tokens
record_token_usage(
span, input_tokens, output_tokens, total_tokens
)
span.set_data(SPANDATA.AI_STREAMING, True)
span.__exit__(None, None, None)

result._iterator = new_iterator()
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result

return _sentry_patched_create
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -44,6 +44,7 @@ def get_file_text(file_name):
],
extras_require={
"aiohttp": ["aiohttp>=3.5"],
"anthropic": ["anthropic>=0.16"],
"arq": ["arq>=0.23"],
"asyncpg": ["asyncpg>=0.23"],
"beam": ["apache-beam>=2.12"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/anthropic/__init__.py
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("anthropic")

0 comments on commit eac253a

Please sign in to comment.