Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): Add integration for asyncpg #2314

Merged
merged 30 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
00d6ad9
feat(integrations): Add integration for asyncpg >= 0.23.0 (WIP)
mimre25 Aug 19, 2023
4a028a2
Merge branch 'master' into asyncpg-integration
antonpirker Aug 30, 2023
63b48b0
Added asyncpg to test matrix
antonpirker Aug 30, 2023
d982755
Added dependency for tests
antonpirker Aug 30, 2023
0d45020
Added dependency for tests
antonpirker Aug 30, 2023
9756b69
test(asyncpg-integration): Read test PG connection params from enviro…
mimre25 Sep 2, 2023
44ef217
feat(asyncpg-integration): Add recording of "executemany" information
mimre25 Sep 2, 2023
8f8227a
fix(asyncpg-integration): Fix recording of span durations for asyncpg
mimre25 Sep 2, 2023
7089e6d
feat(integrations): Allow installing sentry-sdk[asyncpg]
mimre25 Sep 2, 2023
3dbe886
feat(asyncpg-integration): Add proper spans for cursors
mimre25 Sep 2, 2023
de6d835
feat(asyncpg-integration): Record calls to execute without params
mimre25 Sep 3, 2023
661ca50
feat(asyncpg-integration): Add tracve recording for connect calls
mimre25 Sep 3, 2023
8fa3d03
refactor(asyncpg-integration): Extract duplicated code into function
mimre25 Sep 3, 2023
d5656f9
fix(typing): Fix type annotations for asyncpg integration
mimre25 Sep 3, 2023
00c3f97
Merge branch 'master' into asyncpg-integration
antonpirker Sep 4, 2023
52aa3a0
Merge branch 'master' into asyncpg-integration
antonpirker Sep 5, 2023
63b58dc
Added db span data
antonpirker Sep 5, 2023
c528904
Linting fix
antonpirker Sep 5, 2023
c64f039
Trying to remove ParamSpec
antonpirker Sep 5, 2023
fd1bf6b
Reformat
antonpirker Sep 5, 2023
d179931
Fixed typing and more db span data recording.
antonpirker Sep 5, 2023
ea524dd
Removed syntax not known to python 2
antonpirker Sep 5, 2023
97d9eab
Merge branch 'master' into asyncpg-integration
antonpirker Sep 6, 2023
bd75d0a
Merge branch 'master' into asyncpg-integration
antonpirker Sep 6, 2023
07161a1
fix(tests): Fix expected results for asyncpg connect tests
mimre25 Sep 9, 2023
f04a9c7
fix(asyncpg-integration): Remove recording of every cursor execute
mimre25 Sep 9, 2023
4c977dd
fix(typing): Import __future__ annotations to allow modern type hints
mimre25 Sep 9, 2023
47aa451
fix(typing): Fix type hints for asyncpg integration
mimre25 Sep 9, 2023
9715829
Merge branch 'master' into asyncpg-integration
antonpirker Sep 11, 2023
89c1023
Added postgres to asyncpg tests
antonpirker Sep 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 102 additions & 0 deletions .github/workflows/test-integration-asyncpg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
name: Test asyncpg

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless

jobs:
test:
name: asyncpg, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30

strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: sentry
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
# Maps tcp port 5432 on service container to the host
ports:
- 5432:5432
env:
SENTRY_PYTHON_TEST_POSTGRES_USER: postgres
SENTRY_PYTHON_TEST_POSTGRES_PASSWORD: sentry
SENTRY_PYTHON_TEST_POSTGRES_NAME: ci_test
SENTRY_PYTHON_TEST_POSTGRES_HOST: localhost

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install coverage "tox>=3,<4"

- name: Test asyncpg
uses: nick-fields/retry@v2
with:
timeout_minutes: 15
max_attempts: 2
retry_wait_seconds: 5
shell: bash
command: |
set -x # print commands that are executed
coverage erase

# Run tests
./scripts/runtox.sh "py${{ matrix.python-version }}-asyncpg" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch &&
coverage combine .coverage* &&
coverage xml -i

- uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.xml


check_required_tests:
name: All asyncpg tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs has failed. You may need to re-run it." && exit 1
5 changes: 4 additions & 1 deletion scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
TEMPLATE_SNIPPET_TEST = TEMPLATE_DIR / "ci-yaml-test-snippet.txt"
TEMPLATE_SNIPPET_TEST_PY27 = TEMPLATE_DIR / "ci-yaml-test-py27-snippet.txt"

FRAMEWORKS_NEEDING_POSTGRES = ["django"]
FRAMEWORKS_NEEDING_POSTGRES = [
"django",
"asyncpg",
]

MATRIX_DEFINITION = """
strategy:
Expand Down
7 changes: 7 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class SPANDATA:
Example: myDatabase
"""

DB_USER = "db.user"
"""
The name of the database user used for connecting to the database.
See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/database.md
Example: my_user
"""

DB_OPERATION = "db.operation"
"""
The name of the operation being executed, e.g. the MongoDB command name such as findAndModify, or the SQL keyword.
Expand Down
202 changes: 202 additions & 0 deletions sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from __future__ import annotations
import contextlib
from typing import Any, TypeVar, Callable, Awaitable, Iterator

from asyncpg.cursor import BaseCursor # type: ignore

from sentry_sdk import Hub
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import record_sql_queries
from sentry_sdk.utils import parse_version, capture_internal_exceptions

try:
import asyncpg # type: ignore[import]

except ImportError:
raise DidNotEnable("asyncpg not installed.")

# asyncpg.__version__ is a string containing the semantic version in the form of "<major>.<minor>.<patch>"
asyncpg_version = parse_version(asyncpg.__version__)

if asyncpg_version is not None and asyncpg_version < (0, 23, 0):
raise DidNotEnable("asyncpg >= 0.23.0 required")


class AsyncPGIntegration(Integration):
identifier = "asyncpg"
_record_params = False

def __init__(self, *, record_params: bool = False):
AsyncPGIntegration._record_params = record_params

@staticmethod
def setup_once() -> None:
asyncpg.Connection.execute = _wrap_execute(
asyncpg.Connection.execute,
)

asyncpg.Connection._execute = _wrap_connection_method(
asyncpg.Connection._execute
)
asyncpg.Connection._executemany = _wrap_connection_method(
asyncpg.Connection._executemany, executemany=True
)
asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
asyncpg.connect_utils._connect_addr
)


T = TypeVar("T")


def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
async def _inner(*args: Any, **kwargs: Any) -> T:
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

# Avoid recording calls to _execute twice.
# Calls to Connection.execute with args also call
# Connection._execute, which is recorded separately
# args[0] = the connection object, args[1] is the query
if integration is None or len(args) > 2:
return await f(*args, **kwargs)

query = args[1]
with record_sql_queries(hub, None, query, None, None, executemany=False):
res = await f(*args, **kwargs)
return res

return _inner


SubCursor = TypeVar("SubCursor", bound=BaseCursor)


@contextlib.contextmanager
def _record(
hub: Hub,
cursor: SubCursor | None,
query: str,
params_list: tuple[Any, ...] | None,
*,
executemany: bool = False,
) -> Iterator[Span]:
integration = hub.get_integration(AsyncPGIntegration)
if not integration._record_params:
params_list = None

param_style = "pyformat" if params_list else None

with record_sql_queries(
hub,
cursor,
query,
params_list,
param_style,
executemany=executemany,
record_cursor_repr=cursor is not None,
) as span:
yield span


def _wrap_connection_method(
f: Callable[..., Awaitable[T]], *, executemany: bool = False
) -> Callable[..., Awaitable[T]]:
async def _inner(*args: Any, **kwargs: Any) -> T:
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

if integration is None:
return await f(*args, **kwargs)

query = args[1]
params_list = args[2] if len(args) > 2 else None
with _record(hub, None, query, params_list, executemany=executemany) as span:
_set_db_data(span, args[0])
res = await f(*args, **kwargs)
return res

return _inner


def _wrap_cursor_creation(f: Callable[..., T]) -> Callable[..., T]:
def _inner(*args: Any, **kwargs: Any) -> T: # noqa: N807
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

if integration is None:
return f(*args, **kwargs)

query = args[1]
params_list = args[2] if len(args) > 2 else None

with _record(
hub,
None,
query,
params_list,
executemany=False,
) as span:
_set_db_data(span, args[0])
res = f(*args, **kwargs)
antonpirker marked this conversation as resolved.
Show resolved Hide resolved
span.set_data("db.cursor", res)

return res

return _inner


def _wrap_connect_addr(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
async def _inner(*args: Any, **kwargs: Any) -> T:
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

if integration is None:
return await f(*args, **kwargs)

user = kwargs["params"].user
database = kwargs["params"].database

with hub.start_span(op=OP.DB, description="connect") as span:
span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
addr = kwargs.get("addr")
if addr:
try:
span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
span.set_data(SPANDATA.SERVER_PORT, addr[1])
except IndexError:
pass
span.set_data(SPANDATA.DB_NAME, database)
span.set_data(SPANDATA.DB_USER, user)

with capture_internal_exceptions():
hub.add_breadcrumb(message="connect", category="query", data=span._data)
res = await f(*args, **kwargs)

return res

return _inner


def _set_db_data(span: Span, conn: Any) -> None:
span.set_data(SPANDATA.DB_SYSTEM, "postgresql")

addr = conn._addr
if addr:
try:
span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
span.set_data(SPANDATA.SERVER_PORT, addr[1])
except IndexError:
pass

database = conn._params.database
if database:
span.set_data(SPANDATA.DB_NAME, database)

user = conn._params.user
if user:
span.set_data(SPANDATA.DB_USER, user)
3 changes: 3 additions & 0 deletions sentry_sdk/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def record_sql_queries(
params_list, # type: Any
paramstyle, # type: Optional[str]
executemany, # type: bool
record_cursor_repr=False, # type: bool
):
# type: (...) -> Generator[sentry_sdk.tracing.Span, None, None]

Expand All @@ -132,6 +133,8 @@ def record_sql_queries(
data["db.paramstyle"] = paramstyle
if executemany:
data["db.executemany"] = True
if record_cursor_repr and cursor is not None:
data["db.cursor"] = cursor
antonpirker marked this conversation as resolved.
Show resolved Hide resolved

with capture_internal_exceptions():
hub.add_breadcrumb(message=query, category="query", data=data)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def get_file_text(file_name):
extras_require={
"aiohttp": ["aiohttp>=3.5"],
"arq": ["arq>=0.23"],
"asyncpg": ["asyncpg>=0.23"],
"beam": ["apache-beam>=2.12"],
"bottle": ["bottle>=0.12.13"],
"celery": ["celery>=3"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/asyncpg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("asyncpg")