Skip to content

Commit

Permalink
feat(celery): Send queue name to Sentry
Browse files Browse the repository at this point in the history
Send the queue name to Sentry for Celery tasks using the default exchange. The queue name is sent as span data with the key `messaging.destination.name`.

Ref GH-2961
  • Loading branch information
szokeasaurusrex committed Apr 30, 2024
1 parent 9cf6377 commit a70685e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
24 changes: 21 additions & 3 deletions sentry_sdk/integrations/celery/__init__.py
Expand Up @@ -306,20 +306,38 @@ def _inner(*args, **kwargs):
return _inner # type: ignore


def _set_messaging_destination_name(task, span):
# type: (Any, Span) -> None
"""Set "messaging.destination.name" tag for span"""
with capture_internal_exceptions():
delivery_info = task.request.delivery_info
routing_key = delivery_info.get("routing_key")
if delivery_info.get("exchange") == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning the tasks
# are sent to the queue with the same name as the routing key.
span.set_data("messaging.destination.name", routing_key)


def _wrap_task_call(task, f):
# type: (Any, F) -> F

# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.

# functools.wraps is important here because celery-once looks at this
# method's name.
# method's name. @ensure_integration_enabled internally calls functools.wraps,
# but if we ever remove the @ensure_integration_enabled decorator, we need
# to add @functools.wraps(f) here.
# https://github.com/getsentry/sentry-python/issues/421
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args, **kwargs):
# type: (*Any, **Any) -> Any
try:
return f(*args, **kwargs)
with sentry_sdk.start_span(
op=OP.QUEUE_TASK_CELERY, description=task.name
) as span:
_set_messaging_destination_name(task, span)
return f(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
Expand Down
12 changes: 11 additions & 1 deletion tests/integrations/celery/test_celery.py
Expand Up @@ -209,7 +209,17 @@ def dummy_task(x, y):
else:
assert execution_event["contexts"]["trace"]["status"] == "ok"

assert execution_event["spans"] == []
assert len(execution_event["spans"]) == 1
assert (
execution_event["spans"][0].items()
>= {
"trace_id": str(transaction.trace_id),
"same_process_as_parent": True,
"op": "queue.task.celery",
"description": "dummy_task",
"data": ApproxDict(),
}.items()
)
assert submission_event["spans"] == [
{
"data": ApproxDict(),
Expand Down

0 comments on commit a70685e

Please sign in to comment.