Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiling): Better gevent support #1822

Merged
merged 4 commits into from Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
183 changes: 127 additions & 56 deletions sentry_sdk/profiler.py
Expand Up @@ -104,6 +104,15 @@
},
)

try:
from gevent.monkey import is_module_patched # type: ignore
except ImportError:

def is_module_patched(*args, **kwargs):
# type: (*Any, **Any) -> bool
# unable to import from gevent means no modules have been patched
return False


_scheduler = None # type: Optional[Scheduler]

Expand All @@ -128,11 +137,31 @@ def setup_profiler(options):

frequency = 101

profiler_mode = options["_experiments"].get("profiler_mode", SleepScheduler.mode)
if profiler_mode == SleepScheduler.mode:
_scheduler = SleepScheduler(frequency=frequency)
if is_module_patched("threading") or is_module_patched("_thread"):
# If gevent has patched the threading modules then we cannot rely on
# them to spawn a native thread for sampling.
# Instead we default to the GeventScheduler which is capable of
# spawning native threads within gevent.
default_profiler_mode = GeventScheduler.mode
else:
default_profiler_mode = ThreadScheduler.mode

profiler_mode = options["_experiments"].get("profiler_mode", default_profiler_mode)

if (
profiler_mode == ThreadScheduler.mode
# for legacy reasons, we'll keep supporting sleep mode for this scheduler
or profiler_mode == "sleep"
):
_scheduler = ThreadScheduler(frequency=frequency)
elif profiler_mode == GeventScheduler.mode:
try:
_scheduler = GeventScheduler(frequency=frequency)
except ImportError:
raise ValueError("Profiler mode: {} is not available".format(profiler_mode))
else:
raise ValueError("Unknown profiler mode: {}".format(profiler_mode))

_scheduler.setup()

atexit.register(teardown_profiler)
Expand Down Expand Up @@ -445,6 +474,11 @@ def __init__(self, frequency):
# type: (int) -> None
self.interval = 1.0 / frequency

self.sampler = self.make_sampler()

self.new_profiles = deque() # type: Deque[Profile]
self.active_profiles = set() # type: Set[Profile]

def __enter__(self):
# type: () -> Scheduler
self.setup()
Expand All @@ -462,50 +496,6 @@ def teardown(self):
# type: () -> None
raise NotImplementedError

def start_profiling(self, profile):
# type: (Profile) -> None
raise NotImplementedError

def stop_profiling(self, profile):
# type: (Profile) -> None
raise NotImplementedError


class ThreadScheduler(Scheduler):
"""
This abstract scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
"""

mode = "thread"
name = None # type: Optional[str]

def __init__(self, frequency):
# type: (int) -> None
super(ThreadScheduler, self).__init__(frequency=frequency)

self.sampler = self.make_sampler()

# used to signal to the thread that it should stop
self.event = threading.Event()

# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)

self.new_profiles = deque() # type: Deque[Profile]
self.active_profiles = set() # type: Set[Profile]

def setup(self):
# type: () -> None
self.thread.start()

def teardown(self):
# type: () -> None
self.event.set()
self.thread.join()

def start_profiling(self, profile):
# type: (Profile) -> None
profile.active = True
Expand All @@ -515,10 +505,6 @@ def stop_profiling(self, profile):
# type: (Profile) -> None
profile.active = False

def run(self):
# type: () -> None
raise NotImplementedError

def make_sampler(self):
# type: () -> Callable[..., None]
cwd = os.getcwd()
Expand Down Expand Up @@ -600,14 +586,99 @@ def _sample_stack(*args, **kwargs):
return _sample_stack


class SleepScheduler(ThreadScheduler):
class ThreadScheduler(Scheduler):
"""
This scheduler uses time.sleep to wait the required interval before calling
the sampling function.
This scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
"""

mode = "sleep"
name = "sentry.profiler.SleepScheduler"
mode = "thread"
name = "sentry.profiler.ThreadScheduler"

def __init__(self, frequency):
# type: (int) -> None
super(ThreadScheduler, self).__init__(frequency=frequency)

# used to signal to the thread that it should stop
self.event = threading.Event()

# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)

def setup(self):
# type: () -> None
self.thread.start()

def teardown(self):
# type: () -> None
self.event.set()
self.thread.join()

def run(self):
# type: () -> None
last = time.perf_counter()

while True:
if self.event.is_set():
break

self.sampler()

# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
elapsed = time.perf_counter() - last
if elapsed < self.interval:
time.sleep(self.interval - elapsed)

# after sleeping, make sure to take the current
# timestamp so we can use it next iteration
last = time.perf_counter()


class GeventScheduler(Scheduler):
"""
This scheduler is based on the thread scheduler but adapted to work with
gevent. When using gevent, it may monkey patch the threading modules
(`threading` and `_thread`). This results in the use of greenlets instead
of native threads.

This is an issue because the sampler CANNOT run in a greenlet because
1. Other greenlets doing sync work will prevent the sampler from running
2. The greenlet runs in the same thread as other greenlets so when taking
a sample, other greenlets will have been evicted from the thread. This
results in a sample containing only the sampler's code.
"""

mode = "gevent"
name = "sentry.profiler.GeventScheduler"

def __init__(self, frequency):
# type: (int) -> None

# This can throw an ImportError that must be caught if `gevent` is
# not installed.
from gevent.threadpool import ThreadPool # type: ignore

super(GeventScheduler, self).__init__(frequency=frequency)

# used to signal to the thread that it should stop
self.event = threading.Event()

# Using gevent's ThreadPool allows us to bypass greenlets and spawn
# native threads.
self.pool = ThreadPool(1)

def setup(self):
# type: () -> None
self.pool.spawn(self.run)

def teardown(self):
# type: () -> None
self.event.set()
self.pool.join()

def run(self):
# type: () -> None
Expand Down
57 changes: 46 additions & 11 deletions tests/test_profiler.py
Expand Up @@ -6,32 +6,56 @@
import pytest

from sentry_sdk.profiler import (
GeventScheduler,
Profile,
SleepScheduler,
ThreadScheduler,
extract_frame,
extract_stack,
get_frame_name,
setup_profiler,
)
from sentry_sdk.tracing import Transaction

try:
import gevent
except ImportError:
gevent = None


minimum_python_33 = pytest.mark.skipif(
sys.version_info < (3, 3), reason="Profiling is only supported in Python >= 3.3"
)

requires_gevent = pytest.mark.skipif(gevent is None, reason="gevent not enabled")


def process_test_sample(sample):
return [(tid, (stack, stack)) for tid, stack in sample]


@minimum_python_33
def test_profiler_invalid_mode(teardown_profiling):
@pytest.mark.parametrize(
"mode",
[
pytest.param("foo"),
pytest.param(
"gevent",
marks=pytest.mark.skipif(gevent is not None, reason="gevent not enabled"),
),
],
)
def test_profiler_invalid_mode(mode, teardown_profiling):
with pytest.raises(ValueError):
setup_profiler({"_experiments": {"profiler_mode": "magic"}})
setup_profiler({"_experiments": {"profiler_mode": mode}})


@pytest.mark.parametrize("mode", ["sleep"])
@pytest.mark.parametrize(
"mode",
[
pytest.param("thread"),
pytest.param("sleep"),
pytest.param("gevent", marks=requires_gevent),
],
)
def test_profiler_valid_mode(mode, teardown_profiling):
# should not raise any exceptions
setup_profiler({"_experiments": {"profiler_mode": mode}})
Expand All @@ -56,7 +80,6 @@ def inherited_instance_method(self):

def inherited_instance_method_wrapped(self):
def wrapped():
self
return inspect.currentframe()

return wrapped
Expand All @@ -68,7 +91,6 @@ def inherited_class_method(cls):
@classmethod
def inherited_class_method_wrapped(cls):
def wrapped():
cls
return inspect.currentframe()

return wrapped
Expand All @@ -84,7 +106,6 @@ def instance_method(self):

def instance_method_wrapped(self):
def wrapped():
self
return inspect.currentframe()

return wrapped
Expand All @@ -96,7 +117,6 @@ def class_method(cls):
@classmethod
def class_method_wrapped(cls):
def wrapped():
cls
return inspect.currentframe()

return wrapped
Expand Down Expand Up @@ -258,7 +278,19 @@ def get_scheduler_threads(scheduler):
@minimum_python_33
@pytest.mark.parametrize(
("scheduler_class",),
[pytest.param(SleepScheduler, id="sleep scheduler")],
[
pytest.param(ThreadScheduler, id="thread scheduler"),
pytest.param(
GeventScheduler,
marks=[
requires_gevent,
pytest.mark.skip(
reason="cannot find this thread via threading.enumerate()"
),
],
id="gevent scheduler",
),
],
)
def test_thread_scheduler_single_background_thread(scheduler_class):
scheduler = scheduler_class(frequency=1000)
Expand Down Expand Up @@ -576,7 +608,10 @@ def test_thread_scheduler_single_background_thread(scheduler_class):
)
@pytest.mark.parametrize(
("scheduler_class",),
[pytest.param(SleepScheduler, id="sleep scheduler")],
[
pytest.param(ThreadScheduler, id="thread scheduler"),
pytest.param(GeventScheduler, marks=requires_gevent, id="gevent scheduler"),
],
)
def test_profile_processing(
DictionaryContaining, # noqa: N803
Expand Down