Skip to content

Commit

Permalink
feat(profiling): Better gevent support (#1822)
Browse files Browse the repository at this point in the history
We're missing frames from gevent threads. Using `gevent.threadpool.ThreadPool`
seems to fix that. The monkey patching gevent does is causing the sampler thread
to run in a greenlet on the same thread as the all other greenlets. So when it
is taking a sample, the sampler is current greenlet thus no useful stacks can be
seen.
  • Loading branch information
Zylphrex committed Jan 17, 2023
1 parent 1445c73 commit ffe7737
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 67 deletions.
183 changes: 127 additions & 56 deletions sentry_sdk/profiler.py
Expand Up @@ -104,6 +104,15 @@
},
)

try:
from gevent.monkey import is_module_patched # type: ignore
except ImportError:

def is_module_patched(*args, **kwargs):
# type: (*Any, **Any) -> bool
# unable to import from gevent means no modules have been patched
return False


_scheduler = None # type: Optional[Scheduler]

Expand All @@ -128,11 +137,31 @@ def setup_profiler(options):

frequency = 101

profiler_mode = options["_experiments"].get("profiler_mode", SleepScheduler.mode)
if profiler_mode == SleepScheduler.mode:
_scheduler = SleepScheduler(frequency=frequency)
if is_module_patched("threading") or is_module_patched("_thread"):
# If gevent has patched the threading modules then we cannot rely on
# them to spawn a native thread for sampling.
# Instead we default to the GeventScheduler which is capable of
# spawning native threads within gevent.
default_profiler_mode = GeventScheduler.mode
else:
default_profiler_mode = ThreadScheduler.mode

profiler_mode = options["_experiments"].get("profiler_mode", default_profiler_mode)

if (
profiler_mode == ThreadScheduler.mode
# for legacy reasons, we'll keep supporting sleep mode for this scheduler
or profiler_mode == "sleep"
):
_scheduler = ThreadScheduler(frequency=frequency)
elif profiler_mode == GeventScheduler.mode:
try:
_scheduler = GeventScheduler(frequency=frequency)
except ImportError:
raise ValueError("Profiler mode: {} is not available".format(profiler_mode))
else:
raise ValueError("Unknown profiler mode: {}".format(profiler_mode))

_scheduler.setup()

atexit.register(teardown_profiler)
Expand Down Expand Up @@ -445,6 +474,11 @@ def __init__(self, frequency):
# type: (int) -> None
self.interval = 1.0 / frequency

self.sampler = self.make_sampler()

self.new_profiles = deque() # type: Deque[Profile]
self.active_profiles = set() # type: Set[Profile]

def __enter__(self):
# type: () -> Scheduler
self.setup()
Expand All @@ -462,50 +496,6 @@ def teardown(self):
# type: () -> None
raise NotImplementedError

def start_profiling(self, profile):
# type: (Profile) -> None
raise NotImplementedError

def stop_profiling(self, profile):
# type: (Profile) -> None
raise NotImplementedError


class ThreadScheduler(Scheduler):
"""
This abstract scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
"""

mode = "thread"
name = None # type: Optional[str]

def __init__(self, frequency):
# type: (int) -> None
super(ThreadScheduler, self).__init__(frequency=frequency)

self.sampler = self.make_sampler()

# used to signal to the thread that it should stop
self.event = threading.Event()

# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)

self.new_profiles = deque() # type: Deque[Profile]
self.active_profiles = set() # type: Set[Profile]

def setup(self):
# type: () -> None
self.thread.start()

def teardown(self):
# type: () -> None
self.event.set()
self.thread.join()

def start_profiling(self, profile):
# type: (Profile) -> None
profile.active = True
Expand All @@ -515,10 +505,6 @@ def stop_profiling(self, profile):
# type: (Profile) -> None
profile.active = False

def run(self):
# type: () -> None
raise NotImplementedError

def make_sampler(self):
# type: () -> Callable[..., None]
cwd = os.getcwd()
Expand Down Expand Up @@ -600,14 +586,99 @@ def _sample_stack(*args, **kwargs):
return _sample_stack


class SleepScheduler(ThreadScheduler):
class ThreadScheduler(Scheduler):
"""
This scheduler uses time.sleep to wait the required interval before calling
the sampling function.
This scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
"""

mode = "sleep"
name = "sentry.profiler.SleepScheduler"
mode = "thread"
name = "sentry.profiler.ThreadScheduler"

def __init__(self, frequency):
# type: (int) -> None
super(ThreadScheduler, self).__init__(frequency=frequency)

# used to signal to the thread that it should stop
self.event = threading.Event()

# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)

def setup(self):
# type: () -> None
self.thread.start()

def teardown(self):
# type: () -> None
self.event.set()
self.thread.join()

def run(self):
# type: () -> None
last = time.perf_counter()

while True:
if self.event.is_set():
break

self.sampler()

# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
elapsed = time.perf_counter() - last
if elapsed < self.interval:
time.sleep(self.interval - elapsed)

# after sleeping, make sure to take the current
# timestamp so we can use it next iteration
last = time.perf_counter()


class GeventScheduler(Scheduler):
"""
This scheduler is based on the thread scheduler but adapted to work with
gevent. When using gevent, it may monkey patch the threading modules
(`threading` and `_thread`). This results in the use of greenlets instead
of native threads.
This is an issue because the sampler CANNOT run in a greenlet because
1. Other greenlets doing sync work will prevent the sampler from running
2. The greenlet runs in the same thread as other greenlets so when taking
a sample, other greenlets will have been evicted from the thread. This
results in a sample containing only the sampler's code.
"""

mode = "gevent"
name = "sentry.profiler.GeventScheduler"

def __init__(self, frequency):
# type: (int) -> None

# This can throw an ImportError that must be caught if `gevent` is
# not installed.
from gevent.threadpool import ThreadPool # type: ignore

super(GeventScheduler, self).__init__(frequency=frequency)

# used to signal to the thread that it should stop
self.event = threading.Event()

# Using gevent's ThreadPool allows us to bypass greenlets and spawn
# native threads.
self.pool = ThreadPool(1)

def setup(self):
# type: () -> None
self.pool.spawn(self.run)

def teardown(self):
# type: () -> None
self.event.set()
self.pool.join()

def run(self):
# type: () -> None
Expand Down
57 changes: 46 additions & 11 deletions tests/test_profiler.py
Expand Up @@ -6,32 +6,56 @@
import pytest

from sentry_sdk.profiler import (
GeventScheduler,
Profile,
SleepScheduler,
ThreadScheduler,
extract_frame,
extract_stack,
get_frame_name,
setup_profiler,
)
from sentry_sdk.tracing import Transaction

try:
import gevent
except ImportError:
gevent = None


minimum_python_33 = pytest.mark.skipif(
sys.version_info < (3, 3), reason="Profiling is only supported in Python >= 3.3"
)

requires_gevent = pytest.mark.skipif(gevent is None, reason="gevent not enabled")


def process_test_sample(sample):
return [(tid, (stack, stack)) for tid, stack in sample]


@minimum_python_33
def test_profiler_invalid_mode(teardown_profiling):
@pytest.mark.parametrize(
"mode",
[
pytest.param("foo"),
pytest.param(
"gevent",
marks=pytest.mark.skipif(gevent is not None, reason="gevent not enabled"),
),
],
)
def test_profiler_invalid_mode(mode, teardown_profiling):
with pytest.raises(ValueError):
setup_profiler({"_experiments": {"profiler_mode": "magic"}})
setup_profiler({"_experiments": {"profiler_mode": mode}})


@pytest.mark.parametrize("mode", ["sleep"])
@pytest.mark.parametrize(
"mode",
[
pytest.param("thread"),
pytest.param("sleep"),
pytest.param("gevent", marks=requires_gevent),
],
)
def test_profiler_valid_mode(mode, teardown_profiling):
# should not raise any exceptions
setup_profiler({"_experiments": {"profiler_mode": mode}})
Expand All @@ -56,7 +80,6 @@ def inherited_instance_method(self):

def inherited_instance_method_wrapped(self):
def wrapped():
self
return inspect.currentframe()

return wrapped
Expand All @@ -68,7 +91,6 @@ def inherited_class_method(cls):
@classmethod
def inherited_class_method_wrapped(cls):
def wrapped():
cls
return inspect.currentframe()

return wrapped
Expand All @@ -84,7 +106,6 @@ def instance_method(self):

def instance_method_wrapped(self):
def wrapped():
self
return inspect.currentframe()

return wrapped
Expand All @@ -96,7 +117,6 @@ def class_method(cls):
@classmethod
def class_method_wrapped(cls):
def wrapped():
cls
return inspect.currentframe()

return wrapped
Expand Down Expand Up @@ -258,7 +278,19 @@ def get_scheduler_threads(scheduler):
@minimum_python_33
@pytest.mark.parametrize(
("scheduler_class",),
[pytest.param(SleepScheduler, id="sleep scheduler")],
[
pytest.param(ThreadScheduler, id="thread scheduler"),
pytest.param(
GeventScheduler,
marks=[
requires_gevent,
pytest.mark.skip(
reason="cannot find this thread via threading.enumerate()"
),
],
id="gevent scheduler",
),
],
)
def test_thread_scheduler_single_background_thread(scheduler_class):
scheduler = scheduler_class(frequency=1000)
Expand Down Expand Up @@ -576,7 +608,10 @@ def test_thread_scheduler_single_background_thread(scheduler_class):
)
@pytest.mark.parametrize(
("scheduler_class",),
[pytest.param(SleepScheduler, id="sleep scheduler")],
[
pytest.param(ThreadScheduler, id="thread scheduler"),
pytest.param(GeventScheduler, marks=requires_gevent, id="gevent scheduler"),
],
)
def test_profile_processing(
DictionaryContaining, # noqa: N803
Expand Down

0 comments on commit ffe7737

Please sign in to comment.