Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Make metrics work with gevent #2694

Merged
merged 18 commits into from Feb 6, 2024
19 changes: 7 additions & 12 deletions sentry_sdk/client.py
Expand Up @@ -15,7 +15,6 @@
get_default_release,
handle_in_app,
logger,
is_gevent,
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace, has_tracing_enabled
Expand Down Expand Up @@ -251,18 +250,14 @@ def _capture_envelope(envelope):
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
experiments = self.options.get("_experiments", {})
if experiments.get("enable_metrics", True):
if is_gevent():
logger.warning("Metrics currently not supported with gevent.")
from sentry_sdk.metrics import MetricsAggregator

else:
from sentry_sdk.metrics import MetricsAggregator

self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations", True)
),
)
self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations", True)
),
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
Expand Down
72 changes: 50 additions & 22 deletions sentry_sdk/metrics.py
@@ -1,24 +1,25 @@
import os
import io
import os
import random
import re
import sys
import threading
import random
import time
import zlib
from contextlib import contextmanager
from datetime import datetime
from functools import wraps, partial
from threading import Event, Lock, Thread
from contextlib import contextmanager

import sentry_sdk
from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems
from sentry_sdk._compat import PY2, text_type, utc_from_timestamp, iteritems
from sentry_sdk.utils import (
ContextVar,
now,
nanosecond_time,
to_timestamp,
serialize_frame,
json_dumps,
is_gevent,
)
from sentry_sdk.envelope import Envelope, Item
from sentry_sdk.tracing import (
Expand Down Expand Up @@ -53,7 +54,18 @@
from sentry_sdk._types import MetricValue


_thread_local = threading.local()
try:
from gevent.monkey import get_original # type: ignore
from gevent.threadpool import ThreadPool # type: ignore
except ImportError:
import importlib

def get_original(module, name):
# type: (str, str) -> Any
return getattr(importlib.import_module(module), name)


_in_metrics = ContextVar("in_metrics")
_sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_")
_sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_")
_set = set # set is shadowed below
Expand Down Expand Up @@ -84,15 +96,12 @@ def get_code_location(stacklevel):
def recursion_protection():
# type: () -> Generator[bool, None, None]
"""Enters recursion protection and returns the old flag."""
old_in_metrics = _in_metrics.get(False)
_in_metrics.set(True)
try:
in_metrics = _thread_local.in_metrics
except AttributeError:
in_metrics = False
_thread_local.in_metrics = True
try:
yield in_metrics
yield old_in_metrics
finally:
_thread_local.in_metrics = in_metrics
_in_metrics.set(old_in_metrics)


def metrics_noop(func):
Expand Down Expand Up @@ -411,20 +420,30 @@ def __init__(
self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
self._buckets_total_weight = 0
self._capture_func = capture_func
self._lock = Lock()
self._running = True
self._flush_event = Event()
self._lock = threading.Lock()

if is_gevent() and PY2:
# get_original on threading.Event in Python 2 incorrectly returns
# the gevent-patched class. Luckily, threading.Event is just an alias
# for threading._Event in Python 2, and get_original on
# threading._Event correctly gets us the stdlib original.
event_cls = get_original("threading", "_Event")
else:
event_cls = get_original("threading", "Event")
self._flush_event = event_cls() # type: threading.Event

self._force_flush = False

# The aggregator shifts it's flushing by up to an entire rollup window to
# The aggregator shifts its flushing by up to an entire rollup window to
# avoid multiple clients trampling on end of a 10 second window as all the
# buckets are anchored to multiples of ROLLUP seconds. We randomize this
# number once per aggregator boot to achieve some level of offsetting
# across a fleet of deployed SDKs. Relay itself will also apply independent
# jittering.
self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS

self._flusher = None # type: Optional[Thread]
self._flusher = None # type: Optional[Union[threading.Thread, ThreadPool]]
self._flusher_pid = None # type: Optional[int]
self._ensure_thread()

Expand All @@ -435,25 +454,35 @@ def _ensure_thread(self):
"""
if not self._running:
return False

pid = os.getpid()
if self._flusher_pid == pid:
return True

with self._lock:
self._flusher_pid = pid
self._flusher = Thread(target=self._flush_loop)
self._flusher.daemon = True

if not is_gevent():
self._flusher = threading.Thread(target=self._flush_loop)
self._flusher.daemon = True
start_flusher = self._flusher.start
else:
self._flusher = ThreadPool(1)
start_flusher = partial(self._flusher.spawn, func=self._flush_loop)

try:
self._flusher.start()
start_flusher()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self._running = False
return False

return True

def _flush_loop(self):
# type: (...) -> None
_thread_local.in_metrics = True
_in_metrics.set(True)
while self._running or self._force_flush:
self._flush()
if self._running:
Expand Down Expand Up @@ -608,7 +637,6 @@ def kill(self):

self._running = False
self._flush_event.set()
self._flusher.join()
self._flusher = None

@metrics_noop
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/profiler.py
Expand Up @@ -490,7 +490,7 @@ def _set_initial_sampling_decision(self, sampling_context):
# type: (SamplingContext) -> None
"""
Sets the profile's sampling decision according to the following
precdence rules:
precedence rules:

1. If the transaction to be profiled is not sampled, that decision
will be used, regardless of anything else.
Expand Down