From ffe773745120289d05b66feb3d1194757d88fc02 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Tue, 17 Jan 2023 14:11:06 -0500 Subject: [PATCH] feat(profiling): Better gevent support (#1822) We're missing frames from gevent threads. Using `gevent.threadpool.ThreadPool` seems to fix that. The monkey patching gevent does is causing the sampler thread to run in a greenlet on the same thread as the all other greenlets. So when it is taking a sample, the sampler is current greenlet thus no useful stacks can be seen. --- sentry_sdk/profiler.py | 183 ++++++++++++++++++++++++++++------------- tests/test_profiler.py | 57 ++++++++++--- 2 files changed, 173 insertions(+), 67 deletions(-) diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index 81ba8f5753..20ac90f588 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -104,6 +104,15 @@ }, ) +try: + from gevent.monkey import is_module_patched # type: ignore +except ImportError: + + def is_module_patched(*args, **kwargs): + # type: (*Any, **Any) -> bool + # unable to import from gevent means no modules have been patched + return False + _scheduler = None # type: Optional[Scheduler] @@ -128,11 +137,31 @@ def setup_profiler(options): frequency = 101 - profiler_mode = options["_experiments"].get("profiler_mode", SleepScheduler.mode) - if profiler_mode == SleepScheduler.mode: - _scheduler = SleepScheduler(frequency=frequency) + if is_module_patched("threading") or is_module_patched("_thread"): + # If gevent has patched the threading modules then we cannot rely on + # them to spawn a native thread for sampling. + # Instead we default to the GeventScheduler which is capable of + # spawning native threads within gevent. + default_profiler_mode = GeventScheduler.mode + else: + default_profiler_mode = ThreadScheduler.mode + + profiler_mode = options["_experiments"].get("profiler_mode", default_profiler_mode) + + if ( + profiler_mode == ThreadScheduler.mode + # for legacy reasons, we'll keep supporting sleep mode for this scheduler + or profiler_mode == "sleep" + ): + _scheduler = ThreadScheduler(frequency=frequency) + elif profiler_mode == GeventScheduler.mode: + try: + _scheduler = GeventScheduler(frequency=frequency) + except ImportError: + raise ValueError("Profiler mode: {} is not available".format(profiler_mode)) else: raise ValueError("Unknown profiler mode: {}".format(profiler_mode)) + _scheduler.setup() atexit.register(teardown_profiler) @@ -445,6 +474,11 @@ def __init__(self, frequency): # type: (int) -> None self.interval = 1.0 / frequency + self.sampler = self.make_sampler() + + self.new_profiles = deque() # type: Deque[Profile] + self.active_profiles = set() # type: Set[Profile] + def __enter__(self): # type: () -> Scheduler self.setup() @@ -462,50 +496,6 @@ def teardown(self): # type: () -> None raise NotImplementedError - def start_profiling(self, profile): - # type: (Profile) -> None - raise NotImplementedError - - def stop_profiling(self, profile): - # type: (Profile) -> None - raise NotImplementedError - - -class ThreadScheduler(Scheduler): - """ - This abstract scheduler is based on running a daemon thread that will call - the sampler at a regular interval. - """ - - mode = "thread" - name = None # type: Optional[str] - - def __init__(self, frequency): - # type: (int) -> None - super(ThreadScheduler, self).__init__(frequency=frequency) - - self.sampler = self.make_sampler() - - # used to signal to the thread that it should stop - self.event = threading.Event() - - # make sure the thread is a daemon here otherwise this - # can keep the application running after other threads - # have exited - self.thread = threading.Thread(name=self.name, target=self.run, daemon=True) - - self.new_profiles = deque() # type: Deque[Profile] - self.active_profiles = set() # type: Set[Profile] - - def setup(self): - # type: () -> None - self.thread.start() - - def teardown(self): - # type: () -> None - self.event.set() - self.thread.join() - def start_profiling(self, profile): # type: (Profile) -> None profile.active = True @@ -515,10 +505,6 @@ def stop_profiling(self, profile): # type: (Profile) -> None profile.active = False - def run(self): - # type: () -> None - raise NotImplementedError - def make_sampler(self): # type: () -> Callable[..., None] cwd = os.getcwd() @@ -600,14 +586,99 @@ def _sample_stack(*args, **kwargs): return _sample_stack -class SleepScheduler(ThreadScheduler): +class ThreadScheduler(Scheduler): """ - This scheduler uses time.sleep to wait the required interval before calling - the sampling function. + This scheduler is based on running a daemon thread that will call + the sampler at a regular interval. """ - mode = "sleep" - name = "sentry.profiler.SleepScheduler" + mode = "thread" + name = "sentry.profiler.ThreadScheduler" + + def __init__(self, frequency): + # type: (int) -> None + super(ThreadScheduler, self).__init__(frequency=frequency) + + # used to signal to the thread that it should stop + self.event = threading.Event() + + # make sure the thread is a daemon here otherwise this + # can keep the application running after other threads + # have exited + self.thread = threading.Thread(name=self.name, target=self.run, daemon=True) + + def setup(self): + # type: () -> None + self.thread.start() + + def teardown(self): + # type: () -> None + self.event.set() + self.thread.join() + + def run(self): + # type: () -> None + last = time.perf_counter() + + while True: + if self.event.is_set(): + break + + self.sampler() + + # some time may have elapsed since the last time + # we sampled, so we need to account for that and + # not sleep for too long + elapsed = time.perf_counter() - last + if elapsed < self.interval: + time.sleep(self.interval - elapsed) + + # after sleeping, make sure to take the current + # timestamp so we can use it next iteration + last = time.perf_counter() + + +class GeventScheduler(Scheduler): + """ + This scheduler is based on the thread scheduler but adapted to work with + gevent. When using gevent, it may monkey patch the threading modules + (`threading` and `_thread`). This results in the use of greenlets instead + of native threads. + + This is an issue because the sampler CANNOT run in a greenlet because + 1. Other greenlets doing sync work will prevent the sampler from running + 2. The greenlet runs in the same thread as other greenlets so when taking + a sample, other greenlets will have been evicted from the thread. This + results in a sample containing only the sampler's code. + """ + + mode = "gevent" + name = "sentry.profiler.GeventScheduler" + + def __init__(self, frequency): + # type: (int) -> None + + # This can throw an ImportError that must be caught if `gevent` is + # not installed. + from gevent.threadpool import ThreadPool # type: ignore + + super(GeventScheduler, self).__init__(frequency=frequency) + + # used to signal to the thread that it should stop + self.event = threading.Event() + + # Using gevent's ThreadPool allows us to bypass greenlets and spawn + # native threads. + self.pool = ThreadPool(1) + + def setup(self): + # type: () -> None + self.pool.spawn(self.run) + + def teardown(self): + # type: () -> None + self.event.set() + self.pool.join() def run(self): # type: () -> None diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 44474343ce..115e2f91ca 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -6,8 +6,9 @@ import pytest from sentry_sdk.profiler import ( + GeventScheduler, Profile, - SleepScheduler, + ThreadScheduler, extract_frame, extract_stack, get_frame_name, @@ -15,23 +16,46 @@ ) from sentry_sdk.tracing import Transaction +try: + import gevent +except ImportError: + gevent = None + minimum_python_33 = pytest.mark.skipif( sys.version_info < (3, 3), reason="Profiling is only supported in Python >= 3.3" ) +requires_gevent = pytest.mark.skipif(gevent is None, reason="gevent not enabled") + def process_test_sample(sample): return [(tid, (stack, stack)) for tid, stack in sample] -@minimum_python_33 -def test_profiler_invalid_mode(teardown_profiling): +@pytest.mark.parametrize( + "mode", + [ + pytest.param("foo"), + pytest.param( + "gevent", + marks=pytest.mark.skipif(gevent is not None, reason="gevent not enabled"), + ), + ], +) +def test_profiler_invalid_mode(mode, teardown_profiling): with pytest.raises(ValueError): - setup_profiler({"_experiments": {"profiler_mode": "magic"}}) + setup_profiler({"_experiments": {"profiler_mode": mode}}) -@pytest.mark.parametrize("mode", ["sleep"]) +@pytest.mark.parametrize( + "mode", + [ + pytest.param("thread"), + pytest.param("sleep"), + pytest.param("gevent", marks=requires_gevent), + ], +) def test_profiler_valid_mode(mode, teardown_profiling): # should not raise any exceptions setup_profiler({"_experiments": {"profiler_mode": mode}}) @@ -56,7 +80,6 @@ def inherited_instance_method(self): def inherited_instance_method_wrapped(self): def wrapped(): - self return inspect.currentframe() return wrapped @@ -68,7 +91,6 @@ def inherited_class_method(cls): @classmethod def inherited_class_method_wrapped(cls): def wrapped(): - cls return inspect.currentframe() return wrapped @@ -84,7 +106,6 @@ def instance_method(self): def instance_method_wrapped(self): def wrapped(): - self return inspect.currentframe() return wrapped @@ -96,7 +117,6 @@ def class_method(cls): @classmethod def class_method_wrapped(cls): def wrapped(): - cls return inspect.currentframe() return wrapped @@ -258,7 +278,19 @@ def get_scheduler_threads(scheduler): @minimum_python_33 @pytest.mark.parametrize( ("scheduler_class",), - [pytest.param(SleepScheduler, id="sleep scheduler")], + [ + pytest.param(ThreadScheduler, id="thread scheduler"), + pytest.param( + GeventScheduler, + marks=[ + requires_gevent, + pytest.mark.skip( + reason="cannot find this thread via threading.enumerate()" + ), + ], + id="gevent scheduler", + ), + ], ) def test_thread_scheduler_single_background_thread(scheduler_class): scheduler = scheduler_class(frequency=1000) @@ -576,7 +608,10 @@ def test_thread_scheduler_single_background_thread(scheduler_class): ) @pytest.mark.parametrize( ("scheduler_class",), - [pytest.param(SleepScheduler, id="sleep scheduler")], + [ + pytest.param(ThreadScheduler, id="thread scheduler"), + pytest.param(GeventScheduler, marks=requires_gevent, id="gevent scheduler"), + ], ) def test_profile_processing( DictionaryContaining, # noqa: N803