Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Make metrics work with gevent #2694

Merged
merged 18 commits into from
Feb 6, 2024
19 changes: 7 additions & 12 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
get_default_release,
handle_in_app,
logger,
is_gevent,
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace, has_tracing_enabled
Expand Down Expand Up @@ -251,18 +250,14 @@ def _capture_envelope(envelope):
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
experiments = self.options.get("_experiments", {})
if experiments.get("enable_metrics", True):
if is_gevent():
logger.warning("Metrics currently not supported with gevent.")
from sentry_sdk.metrics import MetricsAggregator

else:
from sentry_sdk.metrics import MetricsAggregator

self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations", True)
),
)
self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations", True)
),
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
Expand Down
55 changes: 43 additions & 12 deletions sentry_sdk/metrics.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import os
import io
import os
import random
import re
import sys
import threading
import random
import time
import zlib
from contextlib import contextmanager
from datetime import datetime
from functools import wraps, partial
from threading import Event, Lock, Thread
from contextlib import contextmanager

import sentry_sdk
from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems
from sentry_sdk._compat import PY2, text_type, utc_from_timestamp, iteritems
from sentry_sdk.utils import (
now,
nanosecond_time,
to_timestamp,
serialize_frame,
json_dumps,
is_gevent,
)
from sentry_sdk.envelope import Envelope, Item
from sentry_sdk.tracing import (
Expand Down Expand Up @@ -53,6 +53,17 @@
from sentry_sdk._types import MetricValue


try:
from gevent.monkey import get_original # type: ignore
from gevent.threadpool import ThreadPool # type: ignore
except ImportError:
import importlib

def get_original(module, name):
# type: (str, str) -> Any
return getattr(importlib.import_module(module), name)


_thread_local = threading.local()
_sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_")
_sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_")
Expand Down Expand Up @@ -411,20 +422,30 @@
self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
self._buckets_total_weight = 0
self._capture_func = capture_func
self._lock = Lock()
self._running = True
self._flush_event = Event()
self._lock = threading.Lock()

if is_gevent() and PY2:
# get_original on threading.Event in Python 2 incorrectly returns
# the gevent-patched class. Luckily, threading.Event is just an alias
# for threading._Event in Python 2, and get_original on
# threading._Event correctly gets us the stdlib original.
event_cls = get_original("threading", "_Event")

Check warning on line 433 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L433

Added line #L433 was not covered by tests
else:
event_cls = get_original("threading", "Event")
self._flush_event = event_cls() # type: threading.Event

self._force_flush = False

# The aggregator shifts it's flushing by up to an entire rollup window to
# The aggregator shifts its flushing by up to an entire rollup window to
# avoid multiple clients trampling on end of a 10 second window as all the
# buckets are anchored to multiples of ROLLUP seconds. We randomize this
# number once per aggregator boot to achieve some level of offsetting
# across a fleet of deployed SDKs. Relay itself will also apply independent
# jittering.
self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS

self._flusher = None # type: Optional[Thread]
self._flusher = None # type: Optional[Union[threading.Thread, ThreadPool]]
self._flusher_pid = None # type: Optional[int]
self._ensure_thread()

Expand All @@ -435,20 +456,30 @@
"""
if not self._running:
return False

pid = os.getpid()
if self._flusher_pid == pid:
return True

with self._lock:
self._flusher_pid = pid
self._flusher = Thread(target=self._flush_loop)
self._flusher.daemon = True

if not is_gevent():
self._flusher = threading.Thread(target=self._flush_loop)
self._flusher.daemon = True
start_flusher = self._flusher.start
else:
self._flusher = ThreadPool(1)
start_flusher = partial(self._flusher.spawn, func=self._flush_loop)

try:
self._flusher.start()
start_flusher()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self._running = False
return False

return True

def _flush_loop(self):
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def _set_initial_sampling_decision(self, sampling_context):
# type: (SamplingContext) -> None
"""
Sets the profile's sampling decision according to the following
precdence rules:
precedence rules:

1. If the transaction to be profiled is not sampled, that decision
will be used, regardless of anything else.
Expand Down