Skip to content

Commit

Permalink
feat(integrations): add support for cluster clients from redis sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
md384 committed Oct 3, 2023
1 parent afc488d commit f798d60
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 15 deletions.
39 changes: 34 additions & 5 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ def _set_pipeline_data(
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)
span.set_tag("redis.transaction", is_transaction)

commands = []
for i, arg in enumerate(command_stack):
Expand Down Expand Up @@ -103,7 +102,7 @@ def sentry_patched_execute(self, *args, **kwargs):
span,
is_cluster,
get_command_args_fn,
self.transaction,
False if is_cluster else self.transaction,
self.command_stack,
)
span.set_data(SPANDATA.DB_SYSTEM, "redis")
Expand Down Expand Up @@ -144,8 +143,37 @@ def _patch_redis(StrictRedis, client): # noqa: N803
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
patch_redis_async_client(redis.asyncio.client.StrictRedis, is_cluster=False)
patch_redis_async_pipeline(
redis.asyncio.client.Pipeline, False, _get_redis_command_args
)


def _patch_redis_cluster():
# type: () -> None
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
try:
from redis import RedisCluster, cluster
except ImportError:
pass

Check warning on line 158 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L157-L158

Added lines #L157 - L158 were not covered by tests
else:
patch_redis_client(RedisCluster, True)
patch_redis_pipeline(cluster.ClusterPipeline, True, _parse_rediscluster_command)

try:
from redis.asyncio import cluster as async_cluster
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(async_cluster.RedisCluster, is_cluster=True)
patch_redis_async_pipeline(
async_cluster.ClusterPipeline, True, _parse_rediscluster_command
)


def _patch_rb():
Expand Down Expand Up @@ -200,6 +228,7 @@ def setup_once():
raise DidNotEnable("Redis client not installed")

_patch_redis(StrictRedis, client)
_patch_redis_cluster()
_patch_rb()

try:
Expand Down
19 changes: 9 additions & 10 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from sentry_sdk.utils import capture_internal_exceptions
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_pipeline_data,
Expand All @@ -18,8 +17,8 @@
from typing import Any


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
def patch_redis_async_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
Expand All @@ -35,19 +34,19 @@ async def _sentry_execute(self, *args, **kwargs):
with capture_internal_exceptions():
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
is_cluster,
get_command_args_fn,
False if is_cluster else self.is_transaction,
self._command_stack if is_cluster else self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute


def patch_redis_async_client(cls):
# type: (Any) -> None
def patch_redis_async_client(cls, is_cluster):
# type: (Any, bool) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
Expand All @@ -60,7 +59,7 @@ async def _sentry_execute_command(self, name, *args, **kwargs):
description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_client_data(span, False, name, *args)
_set_client_data(span, is_cluster, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.cluster")
84 changes: 84 additions & 0 deletions tests/integrations/redis/cluster/test_redis_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pytest
from sentry_sdk import capture_message
from sentry_sdk.consts import SPANDATA
from sentry_sdk.api import start_transaction
from sentry_sdk.integrations.redis import RedisIntegration

import redis


@pytest.fixture(autouse=True)
def monkeypatch_rediscluster_class(reset_integrations):
pipeline_cls = redis.cluster.ClusterPipeline
redis.cluster.NodesManager.initialize = lambda *_, **__: None
redis.RedisCluster.command = lambda *_: []
redis.RedisCluster.pipeline = lambda *_, **__: pipeline_cls(None, None)
pipeline_cls.execute = lambda *_, **__: None
redis.RedisCluster.execute_command = lambda *_, **__: None


def test_rediscluster_basic(sentry_init, capture_events):
sentry_init(integrations=[RedisIntegration()])
events = capture_events()

rc = redis.RedisCluster(host="localhost", port=6379)
rc.get("foobar")
capture_message("hi")

(event,) = events
(crumb,) = event["breadcrumbs"]["values"]

assert crumb == {
"category": "redis",
"message": "GET 'foobar'",
"data": {
"db.operation": "GET",
"redis.key": "foobar",
"redis.command": "GET",
"redis.is_cluster": True,
},
"timestamp": crumb["timestamp"],
"type": "redis",
}


@pytest.mark.parametrize(
"send_default_pii, expected_first_ten",
[
(False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]),
(True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]),
],
)
def test_rediscluster_pipeline(
sentry_init, capture_events, send_default_pii, expected_first_ten
):
sentry_init(
integrations=[RedisIntegration()],
traces_sample_rate=1.0,
send_default_pii=send_default_pii,
)
events = capture_events()

rc = redis.RedisCluster(host="localhost", port=6379)
with start_transaction():
pipeline = rc.pipeline()
pipeline.get("foo")
pipeline.set("bar", 1)
pipeline.set("baz", 2)
pipeline.execute()

(event,) = events
(span,) = event["spans"]
assert span["op"] == "db.redis"
assert span["description"] == "redis.pipeline.execute"
assert span["data"] == {
"redis.commands": {
"count": 3,
"first_ten": expected_first_ten,
},
SPANDATA.DB_SYSTEM: "redis",
}
assert span["tags"] == {
"redis.transaction": False, # For Cluster, this is always False
"redis.is_cluster": True,
}
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster_asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.asyncio.cluster")
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import pytest

from sentry_sdk import capture_message, start_transaction
from sentry_sdk.integrations.redis import RedisIntegration

import redis


async def fake_initialize(*_, **__):
return None


async def fake_execute_command(*_, **__):
return []


async def fake_execute(*_, **__):
return None


@pytest.fixture(autouse=True)
def monkeypatch_rediscluster_asyncio_class(reset_integrations):
pipeline_cls = redis.asyncio.cluster.ClusterPipeline
redis.asyncio.cluster.NodesManager.initialize = fake_initialize
redis.asyncio.RedisCluster.pipeline = lambda self, *_, **__: pipeline_cls(self)
pipeline_cls.execute = fake_execute
redis.asyncio.RedisCluster.execute_command = fake_execute_command


@pytest.mark.asyncio
async def test_async_basic(sentry_init, capture_events):
sentry_init(integrations=[RedisIntegration()])
events = capture_events()

connection = redis.asyncio.RedisCluster(host="localhost", port=6379)

await connection.get("foobar")
capture_message("hi")

(event,) = events
(crumb,) = event["breadcrumbs"]["values"]

assert crumb == {
"category": "redis",
"message": "GET 'foobar'",
"data": {
"db.operation": "GET",
"redis.key": "foobar",
"redis.command": "GET",
"redis.is_cluster": True,
},
"timestamp": crumb["timestamp"],
"type": "redis",
}


@pytest.mark.parametrize(
"send_default_pii, expected_first_ten",
[
(False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]),
(True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]),
],
)
@pytest.mark.asyncio
async def test_async_redis_pipeline(
sentry_init, capture_events, send_default_pii, expected_first_ten
):
sentry_init(
integrations=[RedisIntegration()],
traces_sample_rate=1.0,
send_default_pii=send_default_pii,
)
events = capture_events()

connection = redis.asyncio.RedisCluster(host="localhost", port=6379)
with start_transaction():
pipeline = connection.pipeline()
pipeline.get("foo")
pipeline.set("bar", 1)
pipeline.set("baz", 2)
await pipeline.execute()

(event,) = events
(span,) = event["spans"]
assert span["op"] == "db.redis"
assert span["description"] == "redis.pipeline.execute"
assert span["data"] == {
"redis.commands": {
"count": 3,
"first_ten": expected_first_ten,
}
}
assert span["tags"] == {
"redis.transaction": False,
"redis.is_cluster": True,
}

0 comments on commit f798d60

Please sign in to comment.