From a02eb9c26badfe0d11429d018399455f4394fef7 Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Fri, 10 May 2024 14:25:20 +0200 Subject: [PATCH] feat(celery): Set "messaging.system" on span Set the "messaging.system" data on the "queue.process" span in the Celery integration. The messaging.system span data attribute should be set to the Celery broker being used, e.g. "amqp" for RabbitMQ, "redis" for Redis, and "sqs" for Amazon SQS. Also, add tests for this feature. ref #2951 --- sentry_sdk/consts.py | 5 +++++ sentry_sdk/integrations/celery/__init__.py | 5 +++++ tests/integrations/celery/test_celery.py | 20 +++++++++++++++++++- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index f4a0c7ca4c..6648913e28 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -280,6 +280,11 @@ class SPANDATA: Number of retries/attempts to process a message. """ + MESSAGING_SYSTEM = "messaging.system" + """ + The messaging system's name, e.g. `kafka`, `aws_sqs` + """ + SERVER_ADDRESS = "server.address" """ Name of the database host. diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 6d118a6b44..521d37dc86 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -362,6 +362,11 @@ def _inner(*args, **kwargs): span.set_data( SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries ) + with capture_internal_exceptions(): + span.set_data( + SPANDATA.MESSAGING_SYSTEM, + task.app.connection().transport.driver_type, + ) return f(*args, **kwargs) except Exception: diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 197e692461..4f71d84809 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -28,7 +28,7 @@ def init_celery(sentry_init, request): def inner(propagate_traces=True, backend="always_eager", **kwargs): sentry_init( integrations=[CeleryIntegration(propagate_traces=propagate_traces)], - **kwargs + **kwargs, ) celery = Celery(__name__) @@ -704,3 +704,21 @@ def task(): ... (event,) = events (span,) = event["spans"] assert span["data"]["messaging.message.retry.count"] == 3 + + +@pytest.mark.parametrize("system", ("redis", "amqp")) +def test_messaging_system(system, init_celery, capture_events): + celery = init_celery(enable_tracing=True) + events = capture_events() + + # Does not need to be a real URL, since we use always eager + celery.conf.broker_url = f"{system}://example.com" # noqa: E231 + + @celery.task() + def task(): ... + + task.apply_async() + + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.system"] == system