Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): Add Anthropic Integration #2831

Merged
merged 13 commits into from May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Expand Up @@ -42,6 +42,10 @@ jobs:
- name: Erase coverage
run: |
coverage erase
- name: Test anthropic latest
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-anthropic-latest" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
- name: Test arq latest
run: |
set -x # print commands that are executed
Expand Down Expand Up @@ -98,6 +102,10 @@ jobs:
- name: Erase coverage
run: |
coverage erase
- name: Test anthropic pinned
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-anthropic" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
- name: Test arq pinned
run: |
set -x # print commands that are executed
Expand Down
2 changes: 2 additions & 0 deletions mypy.ini
Expand Up @@ -36,6 +36,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-aiohttp.*]
ignore_missing_imports = True
[mypy-anthropic.*]
ignore_missing_imports = True
[mypy-sanic.*]
ignore_missing_imports = True
[mypy-tornado.*]
Expand Down
1 change: 1 addition & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Expand Up @@ -66,6 +66,7 @@
"gcp",
],
"Data Processing": [
"anthropic",
"arq",
"beam",
"celery",
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Expand Up @@ -217,6 +217,7 @@ class SPANDATA:


class OP:
ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic"
CACHE_GET_ITEM = "cache.get_item"
DB = "db"
DB_REDIS = "db.redis"
Expand Down
179 changes: 179 additions & 0 deletions sentry_sdk/integrations/anthropic.py
@@ -0,0 +1,179 @@
from functools import wraps

import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
package_version,
)

from anthropic.resources import Messages

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Iterator
from anthropic.types import MessageStreamEvent
from sentry_sdk.tracing import Span


COMPLETION_TOKENS_USED = "ai.completion_tоkens.used"
PROMPT_TOKENS_USED = "ai.prompt_tоkens.used"
TOTAL_TOKENS_USED = "ai.total_tоkens.used"


class AnthropicIntegration(Integration):
identifier = "anthropic"

def __init__(self, include_prompts=True):
# type: (AnthropicIntegration, bool) -> None
self.include_prompts = include_prompts

@staticmethod
def setup_once():
# type: () -> None
version = package_version("anthropic")

if version is None:
raise DidNotEnable("Unparsable anthropic version.")

if version < (0, 16):
raise DidNotEnable("anthropic 0.16 or newer required.")

Messages.create = _wrap_message_create(Messages.create)


def _capture_exception(exc):
# type: (Any) -> None
event, hint = event_from_exception(
exc,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "anthropic", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)


def _calculate_token_usage(result, span):
# type: (Messages, Span) -> None
input_tokens = 0
output_tokens = 0
if hasattr(result, "usage"):
usage = result.usage
if hasattr(usage, "input_tokens") and isinstance(usage.input_tokens, int):
input_tokens = usage.input_tokens
if hasattr(usage, "output_tokens") and isinstance(usage.output_tokens, int):
output_tokens = usage.output_tokens

total_tokens = input_tokens + output_tokens

if total_tokens != 0:
span.set_data(TOTAL_TOKENS_USED, total_tokens)
if input_tokens != 0:
span.set_data(PROMPT_TOKENS_USED, input_tokens)
if output_tokens != 0:
span.set_data(COMPLETION_TOKENS_USED, output_tokens)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



def _wrap_message_create(f):
# type: (Any) -> Any
@wraps(f)
@ensure_integration_enabled(AnthropicIntegration, f)
def _sentry_patched_create(*args, **kwargs):
# type: (*Any, **Any) -> Any
if "messages" not in kwargs:
return f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return f(*args, **kwargs)

messages = list(kwargs["messages"])
model = kwargs.get("model")

span = sentry_sdk.start_span(
op=OP.ANTHROPIC_MESSAGES_CREATE, description="Anthropic messages create"
)
span.__enter__()

try:
result = f(*args, **kwargs)
except Exception as exc:
_capture_exception(exc)
span.__exit__(None, None, None)
raise exc from None

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

with capture_internal_exceptions():
span.set_data("ai.model_id", model)
span.set_data("ai.streaming", False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if should_send_default_pii() and integration.include_prompts:
span.set_data("ai.input_messages", messages)
if hasattr(result, "content"):
if should_send_default_pii() and integration.include_prompts:
span.set_data(
"ai.responses",
list(
map(
lambda message: {
"type": message.type,
"text": message.text,
},
result.content,
)
),
)
_calculate_token_usage(result, span)
span.__exit__(None, None, None)
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator():
# type: () -> Iterator[MessageStreamEvent]
input_tokens = 0
output_tokens = 0
content_blocks = []
with capture_internal_exceptions():
for event in old_iterator:
if hasattr(event, "type"):
if event.type == "message_start":
usage = event.message.usage
input_tokens += usage.input_tokens
output_tokens += usage.output_tokens
elif event.type == "content_block_start":
pass
elif event.type == "content_block_delta":
content_blocks.append(event.delta.text)
elif event.type == "content_block_stop":
pass
elif event.type == "message_delta":
output_tokens += event.usage.output_tokens
elif event.type == "message_stop":
continue
yield event

if should_send_default_pii() and integration.include_prompts:
complete_message = "".join(content_blocks)
span.set_data(
"ai.responses",
[{"type": "text", "text": complete_message}],
)
span.set_data(TOTAL_TOKENS_USED, input_tokens + output_tokens)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New helper function again (after langchain merged)

span.set_data(PROMPT_TOKENS_USED, input_tokens)
span.set_data(COMPLETION_TOKENS_USED, output_tokens)
span.set_data("ai.streaming", True)
span.__exit__(None, None, None)

result._iterator = new_iterator()
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result

return _sentry_patched_create
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -44,6 +44,7 @@ def get_file_text(file_name):
],
extras_require={
"aiohttp": ["aiohttp>=3.5"],
"anthropic": ["anthropic>=0.16"],
"arq": ["arq>=0.23"],
"asyncpg": ["asyncpg>=0.23"],
"beam": ["apache-beam>=2.12"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/anthropic/__init__.py
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("anthropic")