Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): Add Anthropic Integration #2831

Merged
merged 13 commits into from May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Expand Up @@ -42,6 +42,10 @@ jobs:
- name: Erase coverage
run: |
coverage erase
- name: Test anthropic latest
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-anthropic-latest" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
- name: Test arq latest
run: |
set -x # print commands that are executed
Expand Down Expand Up @@ -102,6 +106,10 @@ jobs:
- name: Erase coverage
run: |
coverage erase
- name: Test anthropic pinned
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-anthropic" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
- name: Test arq pinned
run: |
set -x # print commands that are executed
Expand Down
2 changes: 2 additions & 0 deletions mypy.ini
Expand Up @@ -36,6 +36,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-aiohttp.*]
ignore_missing_imports = True
[mypy-anthropic.*]
ignore_missing_imports = True
[mypy-sanic.*]
ignore_missing_imports = True
[mypy-tornado.*]
Expand Down
1 change: 1 addition & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Expand Up @@ -66,6 +66,7 @@
"gcp",
],
"Data Processing": [
"anthropic",
"arq",
"beam",
"celery",
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Expand Up @@ -296,6 +296,7 @@ class SPANDATA:


class OP:
ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic"
CACHE_GET_ITEM = "cache.get_item"
DB = "db"
DB_REDIS = "db.redis"
Expand Down
170 changes: 170 additions & 0 deletions sentry_sdk/integrations/anthropic.py
@@ -0,0 +1,170 @@
from functools import wraps

import sentry_sdk
from sentry_sdk.ai.monitoring import record_token_usage
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
package_version,
)

from anthropic.resources import Messages

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Iterator
from anthropic.types import MessageStreamEvent
from sentry_sdk.tracing import Span


class AnthropicIntegration(Integration):
identifier = "anthropic"

def __init__(self, include_prompts=True):
# type: (AnthropicIntegration, bool) -> None
self.include_prompts = include_prompts

@staticmethod
def setup_once():
# type: () -> None
version = package_version("anthropic")

if version is None:
raise DidNotEnable("Unparsable anthropic version.")

if version < (0, 16):
raise DidNotEnable("anthropic 0.16 or newer required.")

Messages.create = _wrap_message_create(Messages.create)


def _capture_exception(exc):
# type: (Any) -> None
event, hint = event_from_exception(
exc,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "anthropic", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)


def _calculate_token_usage(result, span):
# type: (Messages, Span) -> None
input_tokens = 0
output_tokens = 0
if hasattr(result, "usage"):
usage = result.usage
if hasattr(usage, "input_tokens") and isinstance(usage.input_tokens, int):
input_tokens = usage.input_tokens
if hasattr(usage, "output_tokens") and isinstance(usage.output_tokens, int):
output_tokens = usage.output_tokens

total_tokens = input_tokens + output_tokens
record_token_usage(span, input_tokens, output_tokens, total_tokens)


def _wrap_message_create(f):
# type: (Any) -> Any
@wraps(f)
@ensure_integration_enabled(AnthropicIntegration, f)
def _sentry_patched_create(*args, **kwargs):
# type: (*Any, **Any) -> Any
if "messages" not in kwargs:
return f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return f(*args, **kwargs)

messages = list(kwargs["messages"])
model = kwargs.get("model")

span = sentry_sdk.start_span(
op=OP.ANTHROPIC_MESSAGES_CREATE, description="Anthropic messages create"
)
span.__enter__()

try:
result = f(*args, **kwargs)
except Exception as exc:
_capture_exception(exc)
span.__exit__(None, None, None)
raise exc from None

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

with capture_internal_exceptions():
span.set_data(SPANDATA.AI_MODEL_ID, model)
span.set_data(SPANDATA.AI_STREAMING, False)
if should_send_default_pii() and integration.include_prompts:
span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages)
if hasattr(result, "content"):
if should_send_default_pii() and integration.include_prompts:
span.set_data(
SPANDATA.AI_RESPONSES,
list(
map(
lambda message: {
"type": message.type,
"text": message.text,
},
result.content,
)
),
)
_calculate_token_usage(result, span)
span.__exit__(None, None, None)
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator():
# type: () -> Iterator[MessageStreamEvent]
input_tokens = 0
output_tokens = 0
content_blocks = []
with capture_internal_exceptions():
for event in old_iterator:
if hasattr(event, "type"):
if event.type == "message_start":
usage = event.message.usage
input_tokens += usage.input_tokens
output_tokens += usage.output_tokens
elif event.type == "content_block_start":
pass
elif event.type == "content_block_delta":
content_blocks.append(event.delta.text)
elif event.type == "content_block_stop":
pass
elif event.type == "message_delta":
output_tokens += event.usage.output_tokens
elif event.type == "message_stop":
continue
yield event

if should_send_default_pii() and integration.include_prompts:
complete_message = "".join(content_blocks)
span.set_data(
SPANDATA.AI_RESPONSES,
[{"type": "text", "text": complete_message}],
)
total_tokens = input_tokens + output_tokens
record_token_usage(
span, input_tokens, output_tokens, total_tokens
)
span.set_data(SPANDATA.AI_STREAMING, True)
span.__exit__(None, None, None)

result._iterator = new_iterator()
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result

return _sentry_patched_create
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -44,6 +44,7 @@ def get_file_text(file_name):
],
extras_require={
"aiohttp": ["aiohttp>=3.5"],
"anthropic": ["anthropic>=0.16"],
"arq": ["arq>=0.23"],
"asyncpg": ["asyncpg>=0.23"],
"beam": ["apache-beam>=2.12"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/anthropic/__init__.py
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("anthropic")