Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): add support for cluster clients from redis sdk #2394

Merged
merged 13 commits into from
Dec 7, 2023
150 changes: 125 additions & 25 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
)

if TYPE_CHECKING:
from collections.abc import Callable

Check warning on line 16 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L16

Added line #L16 was not covered by tests
from typing import Any, Dict, Sequence
from redis import Redis, RedisCluster
from redis.asyncio.cluster import (

Check warning on line 19 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L18-L19

Added lines #L18 - L19 were not covered by tests
RedisCluster as AsyncRedisCluster,
ClusterPipeline as AsyncClusterPipeline,
)
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
Expand Down Expand Up @@ -83,8 +89,7 @@
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)
span.set_tag("redis.transaction", is_transaction)

commands = []
for i, arg in enumerate(command_stack):
Expand Down Expand Up @@ -118,7 +123,7 @@
span.set_tag("redis.key", args[0])


def _set_db_data(span, connection_params):
def _set_db_data_on_span(span, connection_params):
# type: (Span, Dict[str, Any]) -> None
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -135,8 +140,42 @@
span.set_data(SPANDATA.SERVER_PORT, port)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
def _set_db_data(span, redis_instance):
# type: (Span, Redis[Any]) -> None
try:
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)
except AttributeError:
pass # connections_kwargs may be missing in some cases

Check warning on line 148 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L147-L148

Added lines #L147 - L148 were not covered by tests


def _set_cluster_db_data(span, redis_cluster_instance):
# type: (Span, RedisCluster[Any]) -> None
default_node = redis_cluster_instance.get_default_node()
if default_node is not None:
_set_db_data_on_span(
span, {"host": default_node.host, "port": default_node.port}
szokeasaurusrex marked this conversation as resolved.
Show resolved Hide resolved
)


def _set_async_cluster_db_data(span, async_redis_cluster_instance):
# type: (Span, AsyncRedisCluster[Any]) -> None
default_node = async_redis_cluster_instance.get_default_node()
if default_node is not None and default_node.connection_kwargs is not None:
_set_db_data_on_span(span, default_node.connection_kwargs)


def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance):
# type: (Span, AsyncClusterPipeline[Any]) -> None
_set_async_cluster_db_data(
span,
# the AsyncClusterPipeline has always had a `_client` attr but it is private so potentially problematic and mypy
# does not recognize it - see https://github.com/redis/redis-py/blame/v5.0.0/redis/asyncio/cluster.py#L1386
async_redis_cluster_pipeline_instance._client, # type: ignore[attr-defined]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we could try to avoid using _client here, and instead access what we need through a public API? I would strongly prefer to avoid using an external library's private API, since the private API could change at any time and cause problems in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't believe there is a way we can avoid using _client. We can use a try/except here to be safe and log when it is unavailable. Is there a standard way to raise sentry internal warnings?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, in that case, we can use capture_internal_exceptions here. I also checked that one of your unit tests is verifying this function, so we will get a test failure if _client ever gets removed

)
szokeasaurusrex marked this conversation as resolved.
Show resolved Hide resolved


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

def sentry_patched_execute(self, *args, **kwargs):
Expand All @@ -150,12 +189,12 @@
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
False if is_cluster else self.transaction,
self.command_stack,
)

Expand All @@ -164,8 +203,8 @@
pipeline_cls.execute = sentry_patched_execute


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
def patch_redis_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
"""
This function can be used to instrument custom redis client classes or
subclasses.
Expand All @@ -189,11 +228,7 @@
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
try:
_set_db_data(span, self.connection_pool.connection_kwargs)
except AttributeError:
pass # connections_kwargs may be missing in some cases

set_db_data_fn(span, self)
szokeasaurusrex marked this conversation as resolved.
Show resolved Hide resolved
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)
Expand All @@ -203,14 +238,16 @@

def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
patch_redis_pipeline(
strict_pipeline, False, _get_redis_command_args, _set_db_data
)

try:
import redis.asyncio
Expand All @@ -222,8 +259,56 @@
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
patch_redis_async_client(
redis.asyncio.client.StrictRedis,
is_cluster=False,
set_db_data_fn=_set_db_data,
)
patch_redis_async_pipeline(
redis.asyncio.client.Pipeline,
False,
_get_redis_command_args,
set_db_data_fn=_set_db_data,
)


def _patch_redis_cluster():
# type: () -> None
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
try:
from redis import RedisCluster, cluster
except ImportError:
pass
else:
patch_redis_client(RedisCluster, True, _set_cluster_db_data)
patch_redis_pipeline(
cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
_set_cluster_db_data,
)

try:
from redis.asyncio import cluster as async_cluster
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(
async_cluster.RedisCluster,
is_cluster=True,
set_db_data_fn=_set_async_cluster_db_data,
)
patch_redis_async_pipeline(
async_cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
set_db_data_fn=_set_async_cluster_pipeline_db_data,
)


def _patch_rb():
Expand All @@ -233,9 +318,15 @@
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
patch_redis_client(

Check warning on line 321 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L321

Added line #L321 was not covered by tests
rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(

Check warning on line 324 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L324

Added line #L324 was not covered by tests
rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(

Check warning on line 327 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L327

Added line #L327 was not covered by tests
rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data
)


def _patch_rediscluster():
Expand All @@ -245,7 +336,9 @@
except ImportError:
return

patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data
)

# up to v1.3.6, __version__ attribute is a tuple
# from v2.0.0, __version__ is a string and VERSION a tuple
Expand All @@ -255,11 +348,17 @@
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
if (0, 2, 0) < version < (2, 0, 0):
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.StrictRedisCluster,
is_cluster=True,
set_db_data_fn=_set_db_data,
)
else:
pipeline_cls = rediscluster.pipeline.ClusterPipeline

patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
patch_redis_pipeline(
pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data
)


class RedisIntegration(Integration):
Expand All @@ -278,6 +377,7 @@
raise DidNotEnable("Redis client not installed")

_patch_redis(StrictRedis, client)
_patch_redis_cluster()
_patch_rb()

try:
Expand Down
36 changes: 20 additions & 16 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_db_data,
_set_pipeline_data,
)
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.tracing import Span
from sentry_sdk.utils import capture_internal_exceptions

if TYPE_CHECKING:
from typing import Any
from collections.abc import Callable
from typing import Any, Union
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster

Check warning on line 19 in sentry_sdk/integrations/redis/asyncio.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/asyncio.py#L16-L19

Added lines #L16 - L19 were not covered by tests


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
def patch_redis_async_pipeline(
pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn
):
# type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
Expand All @@ -32,22 +36,22 @@
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
is_cluster,
get_command_args_fn,
False if is_cluster else self.is_transaction,
self._command_stack if is_cluster else self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute
pipeline_cls.execute = _sentry_execute # type: ignore[method-assign]


def patch_redis_async_client(cls):
# type: (Any) -> None
def patch_redis_async_client(cls, is_cluster, set_db_data_fn):
# type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Span, Any], None]) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
Expand All @@ -60,9 +64,9 @@
description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
_set_client_data(span, False, name, *args)
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

cls.execute_command = _sentry_execute_command
cls.execute_command = _sentry_execute_command # type: ignore[method-assign]
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.cluster")