Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Make metrics work with gevent #2694

Merged
merged 18 commits into from Feb 6, 2024
19 changes: 7 additions & 12 deletions sentry_sdk/client.py
Expand Up @@ -15,7 +15,6 @@
get_default_release,
handle_in_app,
logger,
is_gevent,
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace, has_tracing_enabled
Expand Down Expand Up @@ -251,18 +250,14 @@ def _capture_envelope(envelope):
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
experiments = self.options.get("_experiments", {})
if experiments.get("enable_metrics", True):
if is_gevent():
logger.warning("Metrics currently not supported with gevent.")
from sentry_sdk.metrics import MetricsAggregator

else:
from sentry_sdk.metrics import MetricsAggregator

self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations", True)
),
)
self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations", True)
),
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
Expand Down
30 changes: 23 additions & 7 deletions sentry_sdk/metrics.py
Expand Up @@ -2,14 +2,13 @@
import io
import re
import sys
import threading
import random
import time
import zlib
from contextlib import contextmanager
from datetime import datetime
from functools import wraps, partial
from threading import Event, Lock, Thread
from contextlib import contextmanager
from threading import Event, Lock, Thread, local

import sentry_sdk
from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems
Expand All @@ -19,6 +18,7 @@
to_timestamp,
serialize_frame,
json_dumps,
is_gevent,
)
from sentry_sdk.envelope import Envelope, Item
from sentry_sdk.tracing import (
Expand Down Expand Up @@ -53,7 +53,12 @@
from sentry_sdk._types import MetricValue


_thread_local = threading.local()
# if is_gevent():
# from gevent.local import local # type: ignore
# from gevent.event import Event # type: ignore


_thread_local = local()
_sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_")
_sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_")
_set = set # set is shadowed below
Expand Down Expand Up @@ -435,15 +440,26 @@
"""
if not self._running:
return False

pid = os.getpid()
if self._flusher_pid == pid:
return True

with self._lock:
self._flusher_pid = pid
self._flusher = Thread(target=self._flush_loop)
self._flusher.daemon = True

if not is_gevent():
self._flusher = Thread(target=self._flush_loop)
self._flusher.daemon = True
start_flusher = self._flusher.start
else:
from gevent.threadpool import ThreadPool # type: ignore

Check warning on line 456 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L456

Added line #L456 was not covered by tests

self._flusher = ThreadPool(1)
start_flusher = partial(self._flusher.spawn, self._flush_loop)

Check warning on line 459 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L458-L459

Added lines #L458 - L459 were not covered by tests

try:
self._flusher.start()
start_flusher()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/profiler.py
Expand Up @@ -490,7 +490,7 @@ def _set_initial_sampling_decision(self, sampling_context):
# type: (SamplingContext) -> None
"""
Sets the profile's sampling decision according to the following
precdence rules:
precedence rules:

1. If the transaction to be profiled is not sampled, that decision
will be used, regardless of anything else.
Expand Down
102 changes: 55 additions & 47 deletions tests/test_metrics.py
Expand Up @@ -13,13 +13,6 @@
except ImportError:
import mock # python < 3.3

try:
import gevent
except ImportError:
gevent = None

requires_gevent = pytest.mark.skipif(gevent is None, reason="gevent not enabled")


def parse_metrics(bytes):
rv = []
Expand Down Expand Up @@ -52,7 +45,8 @@ def parse_metrics(bytes):
return rv


def test_incr(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_incr(sentry_init, capture_envelopes, maybe_monkeypatched_threading):
sentry_init(
release="fun-release",
environment="not-fun-env",
Expand Down Expand Up @@ -103,7 +97,8 @@ def test_incr(sentry_init, capture_envelopes):
}


def test_timing(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_timing(sentry_init, capture_envelopes, maybe_monkeypatched_threading):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -162,7 +157,10 @@ def test_timing(sentry_init, capture_envelopes):
)


def test_timing_decorator(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_timing_decorator(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -254,7 +252,8 @@ def amazing_nano():
assert line.strip() == "assert amazing() == 42"


def test_timing_basic(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_timing_basic(sentry_init, capture_envelopes, maybe_monkeypatched_threading):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -308,7 +307,8 @@ def test_timing_basic(sentry_init, capture_envelopes):
}


def test_distribution(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_distribution(sentry_init, capture_envelopes, maybe_monkeypatched_threading):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -369,7 +369,8 @@ def test_distribution(sentry_init, capture_envelopes):
)


def test_set(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_set(sentry_init, capture_envelopes, maybe_monkeypatched_threading):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -421,7 +422,8 @@ def test_set(sentry_init, capture_envelopes):
}


def test_gauge(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_gauge(sentry_init, capture_envelopes, maybe_monkeypatched_threading):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -453,6 +455,7 @@ def test_gauge(sentry_init, capture_envelopes):
}


@pytest.mark.forked
def test_multiple(sentry_init, capture_envelopes):
sentry_init(
release="fun-release@1.0.0",
Expand Down Expand Up @@ -506,7 +509,10 @@ def test_multiple(sentry_init, capture_envelopes):
}


def test_transaction_name(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_transaction_name(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -543,8 +549,11 @@ def test_transaction_name(sentry_init, capture_envelopes):
}


@pytest.mark.forked
@pytest.mark.parametrize("sample_rate", [1.0, None])
def test_metric_summaries(sentry_init, capture_envelopes, sample_rate):
def test_metric_summaries(
sentry_init, capture_envelopes, sample_rate, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -650,7 +659,10 @@ def test_metric_summaries(sentry_init, capture_envelopes, sample_rate):
}


def test_metrics_summary_disabled(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_metrics_summary_disabled(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -691,7 +703,10 @@ def test_metrics_summary_disabled(sentry_init, capture_envelopes):
assert "_metrics_summary" not in t["spans"][0]


def test_metrics_summary_filtered(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_metrics_summary_filtered(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
def should_summarize_metric(key, tags):
return key == "foo"

Expand Down Expand Up @@ -757,7 +772,10 @@ def should_summarize_metric(key, tags):
} in t["d:foo@second"]


def test_tag_normalization(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_tag_normalization(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand Down Expand Up @@ -801,7 +819,10 @@ def test_tag_normalization(sentry_init, capture_envelopes):
# fmt: on


def test_before_emit_metric(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_before_emit_metric(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
def before_emit(key, tags):
if key == "removed-metric":
return False
Expand Down Expand Up @@ -841,7 +862,10 @@ def before_emit(key, tags):
}


def test_aggregator_flush(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_aggregator_flush(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release@1.0.0",
environment="not-fun-env",
Expand All @@ -858,7 +882,10 @@ def test_aggregator_flush(sentry_init, capture_envelopes):
assert Hub.current.client.metrics_aggregator.buckets == {}


def test_tag_serialization(sentry_init, capture_envelopes):
@pytest.mark.forked
def test_tag_serialization(
sentry_init, capture_envelopes, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release",
environment="not-fun-env",
Expand Down Expand Up @@ -895,7 +922,10 @@ def test_tag_serialization(sentry_init, capture_envelopes):
}


def test_flush_recursion_protection(sentry_init, capture_envelopes, monkeypatch):
@pytest.mark.forked
def test_flush_recursion_protection(
sentry_init, capture_envelopes, monkeypatch, maybe_monkeypatched_threading
):
sentry_init(
release="fun-release",
environment="not-fun-env",
Expand Down Expand Up @@ -924,8 +954,9 @@ def bad_capture_envelope(*args, **kwargs):
assert m[0][1] == "counter@none"


@pytest.mark.forked
def test_flush_recursion_protection_background_flush(
sentry_init, capture_envelopes, monkeypatch
sentry_init, capture_envelopes, monkeypatch, maybe_monkeypatched_threading
):
monkeypatch.setattr(metrics.MetricsAggregator, "FLUSHER_SLEEP_TIME", 0.1)
sentry_init(
Expand Down Expand Up @@ -954,26 +985,3 @@ def bad_capture_envelope(*args, **kwargs):
m = parse_metrics(envelope.items[0].payload.get_bytes())
assert len(m) == 1
assert m[0][1] == "counter@none"


@pytest.mark.forked
@requires_gevent
def test_no_metrics_with_gevent(sentry_init, capture_envelopes):
from gevent import monkey

monkey.patch_all()

sentry_init(
release="fun-release",
environment="not-fun-env",
_experiments={"enable_metrics": True, "metric_code_locations": True},
)
ts = time.time()
envelopes = capture_envelopes()

metrics.incr("foobar", 1.0, tags={"foo": "bar", "blub": "blah"}, timestamp=ts)
metrics.incr("foobar", 2.0, tags={"foo": "bar", "blub": "blah"}, timestamp=ts)
Hub.current.flush()

assert Hub.current.client.metrics_aggregator is None
assert len(envelopes) == 0