Skip to content

Commit

Permalink
feat(profiling): Add thread data to spans
Browse files Browse the repository at this point in the history
As per getsentry/rfc#75, this adds the thread data to the spans. This will be
needed for the continuous profiling mode in #2830.
  • Loading branch information
Zylphrex committed Mar 18, 2024
1 parent 16d25e2 commit f44dec5
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 89 deletions.
12 changes: 12 additions & 0 deletions sentry_sdk/consts.py
Expand Up @@ -191,6 +191,18 @@ class SPANDATA:
Example: "http.handler"
"""

THREAD_ID = "thread.id"
"""
The thread id within which the span was started. This should be a string.
Example: "7972576320"
"""

THREAD_NAME = "thread.name"
"""
The thread name within which the span was started. This should be a string.
Example: "MainThread"
"""


class OP:
CACHE_GET_ITEM = "cache.get_item"
Expand Down
70 changes: 5 additions & 65 deletions sentry_sdk/profiler.py
Expand Up @@ -42,6 +42,8 @@
from sentry_sdk.utils import (
capture_internal_exception,
filename_for_module,
get_current_thread_meta,
is_gevent,
is_valid_sample_rate,
logger,
nanosecond_time,
Expand Down Expand Up @@ -126,32 +128,16 @@


try:
from gevent import get_hub as get_gevent_hub # type: ignore
from gevent.monkey import get_original, is_module_patched # type: ignore
from gevent.monkey import get_original # type: ignore
from gevent.threadpool import ThreadPool # type: ignore

thread_sleep = get_original("time", "sleep")
except ImportError:

def get_gevent_hub():
# type: () -> Any
return None

thread_sleep = time.sleep

def is_module_patched(*args, **kwargs):
# type: (*Any, **Any) -> bool
# unable to import from gevent means no modules have been patched
return False

ThreadPool = None


def is_gevent():
# type: () -> bool
return is_module_patched("threading") or is_module_patched("_thread")


_scheduler = None # type: Optional[Scheduler]

# The default sampling frequency to use. This is set at 101 in order to
Expand Down Expand Up @@ -389,52 +375,6 @@ def get_frame_name(frame):
MAX_PROFILE_DURATION_NS = int(3e10) # 30 seconds


def get_current_thread_id(thread=None):
# type: (Optional[threading.Thread]) -> Optional[int]
"""
Try to get the id of the current thread, with various fall backs.
"""

# if a thread is specified, that takes priority
if thread is not None:
try:
thread_id = thread.ident
if thread_id is not None:
return thread_id
except AttributeError:
pass

# if the app is using gevent, we should look at the gevent hub first
# as the id there differs from what the threading module reports
if is_gevent():
gevent_hub = get_gevent_hub()
if gevent_hub is not None:
try:
# this is undocumented, so wrap it in try except to be safe
return gevent_hub.thread_ident
except AttributeError:
pass

# use the current thread's id if possible
try:
current_thread_id = threading.current_thread().ident
if current_thread_id is not None:
return current_thread_id
except AttributeError:
pass

# if we can't get the current thread id, fall back to the main thread id
try:
main_thread_id = threading.main_thread().ident
if main_thread_id is not None:
return main_thread_id
except AttributeError:
pass

# we've tried everything, time to give up
return None


class Profile(object):
def __init__(
self,
Expand All @@ -456,7 +396,7 @@ def __init__(

# Various framework integrations are capable of overwriting the active thread id.
# If it is set to `None` at the end of the profile, we fall back to the default.
self._default_active_thread_id = get_current_thread_id() or 0 # type: int
self._default_active_thread_id = get_current_thread_meta()[0] or 0 # type: int
self.active_thread_id = None # type: Optional[int]

try:
Expand All @@ -479,7 +419,7 @@ def __init__(

def update_active_thread_id(self):
# type: () -> None
self.active_thread_id = get_current_thread_id()
self.active_thread_id = get_current_thread_meta()[0]
logger.debug(
"[Profiling] updating active thread id to {tid}".format(
tid=self.active_thread_id
Expand Down
19 changes: 18 additions & 1 deletion sentry_sdk/tracing.py
Expand Up @@ -5,7 +5,12 @@

import sentry_sdk
from sentry_sdk.consts import INSTRUMENTER
from sentry_sdk.utils import is_valid_sample_rate, logger, nanosecond_time
from sentry_sdk.utils import (
get_current_thread_meta,
is_valid_sample_rate,
logger,
nanosecond_time,
)
from sentry_sdk._compat import datetime_utcnow, utc_from_timestamp, PY2
from sentry_sdk.consts import SPANDATA
from sentry_sdk._types import TYPE_CHECKING
Expand Down Expand Up @@ -172,6 +177,9 @@ def __init__(
self._span_recorder = None # type: Optional[_SpanRecorder]
self._local_aggregator = None # type: Optional[LocalAggregator]

thread_id, thread_name = get_current_thread_meta()
self.set_thread(thread_id, thread_name)

# TODO this should really live on the Transaction class rather than the Span
# class
def init_span_recorder(self, maxlen):
Expand Down Expand Up @@ -418,6 +426,15 @@ def set_status(self, value):
# type: (str) -> None
self.status = value

def set_thread(self, thread_id, thread_name):
# type: (Optional[int], Optional[str]) -> None

if thread_id is not None:
self.set_data(SPANDATA.THREAD_ID, thread_id)

if thread_name is not None:
self.set_data(SPANDATA.THREAD_NAME, thread_name)

def set_http_status(self, http_status):
# type: (int) -> None
self.set_tag(
Expand Down
55 changes: 55 additions & 0 deletions sentry_sdk/utils.py
Expand Up @@ -1746,8 +1746,12 @@ def now():


try:
from gevent import get_hub as get_gevent_hub
from gevent.monkey import is_module_patched
except ImportError:
def get_gevent_hub():
# type: () -> Any
return None

Check warning on line 1754 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1754

Added line #L1754 was not covered by tests

def is_module_patched(*args, **kwargs):
# type: (*Any, **Any) -> bool
Expand All @@ -1758,3 +1762,54 @@ def is_module_patched(*args, **kwargs):
def is_gevent():
# type: () -> bool
return is_module_patched("threading") or is_module_patched("_thread")


def get_current_thread_meta(thread=None):
# type: (Optional[threading.Thread]) -> Tuple[Optional[int], Optional[str]]
"""
Try to get the id of the current thread, with various fall backs.
"""

# if a thread is specified, that takes priority
if thread is not None:
try:
thread_id = thread.ident
thread_name = thread.name

Check warning on line 1777 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1775-L1777

Added lines #L1775 - L1777 were not covered by tests
if thread_id is not None:
return thread_id, thread_name
except AttributeError:
pass

Check warning on line 1781 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1779-L1781

Added lines #L1779 - L1781 were not covered by tests

# if the app is using gevent, we should look at the gevent hub first
# as the id there differs from what the threading module reports
if is_gevent():
gevent_hub = get_gevent_hub()

Check warning on line 1786 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1786

Added line #L1786 was not covered by tests
if gevent_hub is not None:
try:

Check warning on line 1788 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1788

Added line #L1788 was not covered by tests
# this is undocumented, so wrap it in try except to be safe
return gevent_hub.thread_ident, gevent_hub.name
except AttributeError:
pass

Check warning on line 1792 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1790-L1792

Added lines #L1790 - L1792 were not covered by tests

# use the current thread's id if possible
try:
thread = threading.current_thread()
thread_id = thread.ident
thread_name = thread.name
if thread_id is not None:
return thread_id, thread_name
except AttributeError:
pass

Check warning on line 1802 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1801-L1802

Added lines #L1801 - L1802 were not covered by tests

# if we can't get the current thread id, fall back to the main thread id
try:
thread = threading.main_thread()
thread_id = thread.ident
thread_name = thread.name

Check warning on line 1808 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1805-L1808

Added lines #L1805 - L1808 were not covered by tests
if thread_id is not None:
return thread_id, thread_name
except AttributeError:
pass

Check warning on line 1812 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1810-L1812

Added lines #L1810 - L1812 were not covered by tests

# we've tried everything, time to give up
return None, None

Check warning on line 1815 in sentry_sdk/utils.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/utils.py#L1815

Added line #L1815 was not covered by tests
23 changes: 0 additions & 23 deletions tests/test_profiler.py
Expand Up @@ -16,7 +16,6 @@
extract_frame,
extract_stack,
frame_id,
get_current_thread_id,
get_frame_name,
setup_profiler,
)
Expand Down Expand Up @@ -556,28 +555,6 @@ def test_extract_stack_with_cache(frame, depth):
assert frame1 is frame2, i


@requires_python_version(3, 3)
def test_get_current_thread_id_explicit_thread():
results = Queue(maxsize=1)

def target1():
pass

def target2():
results.put(get_current_thread_id(thread1))

thread1 = threading.Thread(target=target1)
thread1.start()

thread2 = threading.Thread(target=target2)
thread2.start()

thread2.join()
thread1.join()

assert thread1.ident == results.get(timeout=1)


@requires_python_version(3, 3)
@requires_gevent
def test_get_current_thread_id_gevent_in_thread():
Expand Down
73 changes: 73 additions & 0 deletions tests/test_utils.py
@@ -1,12 +1,15 @@
import pytest
import re
import sys
import threading
from datetime import timedelta

from sentry_sdk._compat import duration_in_milliseconds
from sentry_sdk._queue import Queue
from sentry_sdk.utils import (
Components,
Dsn,
get_current_thread_meta,
get_default_release,
get_error_message,
get_git_revision,
Expand All @@ -29,6 +32,11 @@
except ImportError:
import mock # python < 3.3

try:
import gevent
except ImportError:
gevent = None

try:
# Python 3
FileNotFoundError
Expand Down Expand Up @@ -607,3 +615,68 @@ def test_default_release_empty_string():
)
def test_duration_in_milliseconds(timedelta, expected_milliseconds):
assert duration_in_milliseconds(timedelta) == expected_milliseconds


def test_get_current_thread_id_explicit_thread():
results = Queue(maxsize=1)

def target1():
pass

def target2():
results.put(get_current_thread_meta(thread1))

thread1 = threading.Thread(target=target1)
thread1.start()

thread2 = threading.Thread(target=target2)
thread2.start()

thread2.join()
thread1.join()

assert (thread1.ident, thread1.name) == results.get(timeout=1)


@pytest.mark.skipif(gevent is None, reason="gevent not enabled")
def test_get_current_thread_id_gevent_in_thread():
results = Queue(maxsize=1)

def target():
job = gevent.spawn(get_current_thread_meta)
job.join()
results.put(job.value)

thread = threading.Thread(target=target)
thread.start()
thread.join()
assert (thread.ident, thread.name) == results.get(timeout=1)


def test_get_current_thread_id_running_thread():
results = Queue(maxsize=1)

def target():
results.put(get_current_thread_meta())

thread = threading.Thread(target=target)
thread.start()
thread.join()
assert (thread.ident, thread.name) == results.get(timeout=1)


@pytest.mark.skipif(sys.version_info < (3, 4), reason="threading.main_thread() Not available")
def test_get_current_thread_id_main_thread():
results = Queue(maxsize=1)

def target():
# mock that somehow the current thread doesn't exist
with mock.patch("threading.current_thread", side_effect=[None]):
results.put(get_current_thread_meta())

main_thread = threading.main_thread()

thread = threading.Thread(target=target)
thread.start()
thread.join()
assert (main_thread.ident, main_thread.name) == results.get(timeout=1)

0 comments on commit f44dec5

Please sign in to comment.