Skip to content

Commit

Permalink
feat(integrations): add support for cluster clients from redis sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
md384 committed Oct 16, 2023
1 parent fee865c commit ae5bca6
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 37 deletions.
134 changes: 112 additions & 22 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)

if TYPE_CHECKING:
from typing import Any, Dict, Sequence
from typing import Any, Dict, Sequence, Callable

Check warning on line 16 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L16

Added line #L16 was not covered by tests
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
Expand Down Expand Up @@ -83,8 +83,7 @@ def _set_pipeline_data(
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)
span.set_tag("redis.transaction", is_transaction)

commands = []
for i, arg in enumerate(command_stack):
Expand Down Expand Up @@ -118,7 +117,7 @@ def _set_client_data(span, is_cluster, name, *args):
span.set_tag("redis.key", args[0])


def _set_db_data(span, connection_params):
def _set_db_data_on_span(span, connection_params):
# type: (Span, Dict[str, Any]) -> None
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -135,8 +134,34 @@ def _set_db_data(span, connection_params):
span.set_data(SPANDATA.SERVER_PORT, port)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
def _set_db_data(span, redis_instance):
# type: (Span, Any) -> None
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)


def _set_cluster_db_data(span, redis_cluster_instance):
# type: (Span, Any) -> None
default_node = redis_cluster_instance.get_default_node()
if default_node:
_set_db_data_on_span(
span, {"host": default_node.host, "port": default_node.port}
)


def _set_async_cluster_db_data(span, async_redis_cluster_instance):
# type: (Span, Any) -> None
default_node = async_redis_cluster_instance.get_default_node()
if default_node and default_node.connection_kwargs:
_set_db_data_on_span(span, default_node.connection_kwargs)


def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance):
# type: (Span, Any) -> None
_set_async_cluster_db_data(span, async_redis_cluster_pipeline_instance._client)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

def sentry_patched_execute(self, *args, **kwargs):
Expand All @@ -150,12 +175,12 @@ def sentry_patched_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
False if is_cluster else self.transaction,
self.command_stack,
)

Expand All @@ -164,8 +189,8 @@ def sentry_patched_execute(self, *args, **kwargs):
pipeline_cls.execute = sentry_patched_execute


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
def patch_redis_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
"""
This function can be used to instrument custom redis client classes or
subclasses.
Expand All @@ -189,7 +214,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)
Expand All @@ -199,14 +224,16 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):

def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
patch_redis_pipeline(
strict_pipeline, False, _get_redis_command_args, _set_db_data
)

try:
import redis.asyncio
Expand All @@ -218,8 +245,56 @@ def _patch_redis(StrictRedis, client): # noqa: N803
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
patch_redis_async_client(
redis.asyncio.client.StrictRedis,
is_cluster=False,
set_db_data_fn=_set_db_data,
)
patch_redis_async_pipeline(
redis.asyncio.client.Pipeline,
False,
_get_redis_command_args,
set_db_data_fn=_set_db_data,
)


def _patch_redis_cluster():
# type: () -> None
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
try:
from redis import RedisCluster, cluster
except ImportError:
pass
else:
patch_redis_client(RedisCluster, True, _set_cluster_db_data)
patch_redis_pipeline(
cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
_set_cluster_db_data,
)

try:
from redis.asyncio import cluster as async_cluster
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(
async_cluster.RedisCluster,
is_cluster=True,
set_db_data_fn=_set_async_cluster_db_data,
)
patch_redis_async_pipeline(
async_cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
set_db_data_fn=_set_async_cluster_pipeline_db_data,
)


def _patch_rb():
Expand All @@ -229,9 +304,15 @@ def _patch_rb():
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
patch_redis_client(

Check warning on line 307 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L307

Added line #L307 was not covered by tests
rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(

Check warning on line 310 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L310

Added line #L310 was not covered by tests
rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(

Check warning on line 313 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L313

Added line #L313 was not covered by tests
rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data
)


def _patch_rediscluster():
Expand All @@ -241,7 +322,9 @@ def _patch_rediscluster():
except ImportError:
return

patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data
)

# up to v1.3.6, __version__ attribute is a tuple
# from v2.0.0, __version__ is a string and VERSION a tuple
Expand All @@ -251,11 +334,17 @@ def _patch_rediscluster():
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
if (0, 2, 0) < version < (2, 0, 0):
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.StrictRedisCluster,
is_cluster=True,
set_db_data_fn=_set_db_data,
)
else:
pipeline_cls = rediscluster.pipeline.ClusterPipeline

patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
patch_redis_pipeline(
pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data
)


class RedisIntegration(Integration):
Expand All @@ -274,6 +363,7 @@ def setup_once():
raise DidNotEnable("Redis client not installed")

_patch_redis(StrictRedis, client)
_patch_redis_cluster()
_patch_rb()

try:
Expand Down
29 changes: 15 additions & 14 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_db_data,
_set_pipeline_data,
)
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.tracing import Span
from sentry_sdk.utils import capture_internal_exceptions

if TYPE_CHECKING:
from typing import Any
from typing import Any, Callable

Check warning on line 16 in sentry_sdk/integrations/redis/asyncio.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/asyncio.py#L16

Added line #L16 was not covered by tests


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
def patch_redis_async_pipeline(
pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn
):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
Expand All @@ -32,22 +33,22 @@ async def _sentry_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
is_cluster,
get_command_args_fn,
False if is_cluster else self.is_transaction,
self._command_stack if is_cluster else self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute


def patch_redis_async_client(cls):
# type: (Any) -> None
def patch_redis_async_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
Expand All @@ -60,8 +61,8 @@ async def _sentry_execute_command(self, name, *args, **kwargs):
description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
_set_client_data(span, False, name, *args)
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

Expand Down
6 changes: 5 additions & 1 deletion tests/integrations/redis/asyncio/test_redis_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@

@pytest.mark.asyncio
async def test_async_basic(sentry_init, capture_events):
sentry_init(integrations=[RedisIntegration()])
sentry_init(
integrations=[RedisIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
)
events = capture_events()

connection = FakeRedis()
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.cluster")

0 comments on commit ae5bca6

Please sign in to comment.