From a70685ebd3c65da101e5a5d34de06bc9e4e3bd45 Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Tue, 16 Apr 2024 17:34:00 +0200 Subject: [PATCH] feat(celery): Send queue name to Sentry Send the queue name to Sentry for Celery tasks using the default exchange. The queue name is sent as span data with the key `messaging.destination.name`. Ref GH-2961 --- sentry_sdk/integrations/celery/__init__.py | 24 +++++++++++++++++++--- tests/integrations/celery/test_celery.py | 12 ++++++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 74205a0184..ea217cf6a6 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -306,6 +306,18 @@ def _inner(*args, **kwargs): return _inner # type: ignore +def _set_messaging_destination_name(task, span): + # type: (Any, Span) -> None + """Set "messaging.destination.name" tag for span""" + with capture_internal_exceptions(): + delivery_info = task.request.delivery_info + routing_key = delivery_info.get("routing_key") + if delivery_info.get("exchange") == "" and routing_key is not None: + # Empty exchange indicates the default exchange, meaning the tasks + # are sent to the queue with the same name as the routing key. + span.set_data("messaging.destination.name", routing_key) + + def _wrap_task_call(task, f): # type: (Any, F) -> F @@ -313,13 +325,19 @@ def _wrap_task_call(task, f): # see it. Also celery's reported stacktrace is untrustworthy. # functools.wraps is important here because celery-once looks at this - # method's name. + # method's name. @ensure_integration_enabled internally calls functools.wraps, + # but if we ever remove the @ensure_integration_enabled decorator, we need + # to add @functools.wraps(f) here. # https://github.com/getsentry/sentry-python/issues/421 - @wraps(f) + @ensure_integration_enabled(CeleryIntegration, f) def _inner(*args, **kwargs): # type: (*Any, **Any) -> Any try: - return f(*args, **kwargs) + with sentry_sdk.start_span( + op=OP.QUEUE_TASK_CELERY, description=task.name + ) as span: + _set_messaging_destination_name(task, span) + return f(*args, **kwargs) except Exception: exc_info = sys.exc_info() with capture_internal_exceptions(): diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 708294cf7e..91f0f8e341 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -209,7 +209,17 @@ def dummy_task(x, y): else: assert execution_event["contexts"]["trace"]["status"] == "ok" - assert execution_event["spans"] == [] + assert len(execution_event["spans"]) == 1 + assert ( + execution_event["spans"][0].items() + >= { + "trace_id": str(transaction.trace_id), + "same_process_as_parent": True, + "op": "queue.task.celery", + "description": "dummy_task", + "data": ApproxDict(), + }.items() + ) assert submission_event["spans"] == [ { "data": ApproxDict(),