Skip to content

Commit

Permalink
feat: Code locations for metrics (#2526)
Browse files Browse the repository at this point in the history
DDM wants to show code locations with metrics. Locations are semi-static information: they change infrequently, so they don't need to be reported with every data point.

Sentry expects locations to be reported at least once per day. With backdating of metrics, the timestamp used to report the location is the metric bucket's timestamp rounded down to the start of the day (UTC timezone).

The metrics aggregator keeps a cache of previously reported locations. When a location is seen for the first time on a day, it is added to a list of pending locations. On the next flush cycle, all pending locations are sent to Sentry in the same envelope as the metric buckets.

See: getsentry/relay#2751
Epic: getsentry/sentry#60260
---------

Co-authored-by: Armin Ronacher <armin.ronacher@active-4.com>
Co-authored-by: Anton Pirker <anton.pirker@sentry.io>
  • Loading branch information
3 people committed Nov 24, 2023
1 parent 088431e commit a67914c
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 51 deletions.
1 change: 1 addition & 0 deletions sentry_sdk/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@
FlushedMetricValue = Union[int, float]

BucketKey = Tuple[MetricType, str, MeasurementUnit, MetricTagsInternal]
MetricMetaKey = Tuple[MetricType, str, MeasurementUnit]
8 changes: 6 additions & 2 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,15 @@ def _capture_envelope(envelope):
self.session_flusher = SessionFlusher(capture_func=_capture_envelope)

self.metrics_aggregator = None # type: Optional[MetricsAggregator]
if self.options.get("_experiments", {}).get("enable_metrics"):
experiments = self.options.get("_experiments", {})
if experiments.get("enable_metrics"):
from sentry_sdk.metrics import MetricsAggregator

self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations")
),
)

max_request_body_size = ("always", "never", "small", "medium")
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"transport_num_pools": Optional[int],
"enable_metrics": Optional[bool],
"before_emit_metric": Optional[Callable[[str, MetricTags], bool]],
"metric_code_locations": Optional[bool],
},
total=False,
)
Expand Down
138 changes: 113 additions & 25 deletions sentry_sdk/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import io
import re
import sys
import threading
import random
import time
Expand All @@ -11,8 +12,14 @@
from contextlib import contextmanager

import sentry_sdk
from sentry_sdk._compat import text_type
from sentry_sdk.utils import now, nanosecond_time, to_timestamp
from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems
from sentry_sdk.utils import (
now,
nanosecond_time,
to_timestamp,
serialize_frame,
json_dumps,
)
from sentry_sdk.envelope import Envelope, Item
from sentry_sdk.tracing import (
TRANSACTION_SOURCE_ROUTE,
Expand All @@ -24,18 +31,21 @@

if TYPE_CHECKING:
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import Iterable
from typing import Callable
from typing import List
from typing import Optional
from typing import Generator
from typing import Set
from typing import Tuple
from typing import Union

from sentry_sdk._types import BucketKey
from sentry_sdk._types import DurationUnit
from sentry_sdk._types import FlushedMetricValue
from sentry_sdk._types import MeasurementUnit
from sentry_sdk._types import MetricMetaKey
from sentry_sdk._types import MetricTagValue
from sentry_sdk._types import MetricTags
from sentry_sdk._types import MetricTagsInternal
Expand All @@ -46,6 +56,7 @@
_thread_local = threading.local()
_sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_")
_sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_")
_set = set # set is shadowed below

GOOD_TRANSACTION_SOURCES = frozenset(
[
Expand All @@ -57,6 +68,18 @@
)


def get_code_location(stacklevel):
# type: (int) -> Optional[Dict[str, Any]]
try:
frm = sys._getframe(stacklevel + 4)
except Exception:
return None

return serialize_frame(
frm, include_local_variables=False, include_source_context=False
)


@contextmanager
def recursion_protection():
# type: () -> Generator[bool, None, None]
Expand Down Expand Up @@ -247,7 +270,7 @@ def _encode_metrics(flushable_buckets):
# relay side emission and should not happen commonly.

for timestamp, buckets in flushable_buckets:
for bucket_key, metric in buckets.items():
for bucket_key, metric in iteritems(buckets):
metric_type, metric_name, metric_unit, metric_tags = bucket_key
metric_name = _sanitize_key(metric_name)
_write(metric_name.encode("utf-8"))
Expand Down Expand Up @@ -283,6 +306,20 @@ def _encode_metrics(flushable_buckets):
return out.getvalue()


def _encode_locations(timestamp, code_locations):
# type: (int, Iterable[Tuple[MetricMetaKey, Dict[str, Any]]]) -> bytes
mapping = {} # type: Dict[str, List[Any]]

for key, loc in code_locations:
metric_type, name, unit = key
mri = "{}:{}@{}".format(metric_type, _sanitize_key(name), unit)

loc["type"] = "location"
mapping.setdefault(mri, []).append(loc)

return json_dumps({"timestamp": timestamp, "mapping": mapping})


METRIC_TYPES = {
"c": CounterMetric,
"g": GaugeMetric,
Expand Down Expand Up @@ -311,9 +348,13 @@ class MetricsAggregator(object):
def __init__(
self,
capture_func, # type: Callable[[Envelope], None]
enable_code_locations=False, # type: bool
):
# type: (...) -> None
self.buckets = {} # type: Dict[int, Any]
self._enable_code_locations = enable_code_locations
self._seen_locations = _set() # type: Set[Tuple[int, MetricMetaKey]]
self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
self._buckets_total_weight = 0
self._capture_func = capture_func
self._lock = Lock()
Expand Down Expand Up @@ -366,9 +407,7 @@ def _flush_loop(self):

def _flush(self):
# type: (...) -> None
flushable_buckets = self._flushable_buckets()
if flushable_buckets:
self._emit(flushable_buckets)
self._emit(self._flushable_buckets(), self._flushable_locations())

def _flushable_buckets(self):
# type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
Expand All @@ -385,21 +424,28 @@ def _flushable_buckets(self):
self._force_flush = False
else:
flushable_buckets = []
for buckets_timestamp, buckets in self.buckets.items():
for buckets_timestamp, buckets in iteritems(self.buckets):
# If the timestamp of the bucket is newer that the rollup we want to skip it.
if buckets_timestamp <= cutoff:
flushable_buckets.append((buckets_timestamp, buckets))

# We will clear the elements while holding the lock, in order to avoid requesting it downstream again.
for buckets_timestamp, buckets in flushable_buckets:
for _, metric in buckets.items():
for _, metric in iteritems(buckets):
weight_to_remove += metric.weight
del self.buckets[buckets_timestamp]

self._buckets_total_weight -= weight_to_remove

return flushable_buckets

def _flushable_locations(self):
# type: (...) -> Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
with self._lock:
locations = self._pending_locations
self._pending_locations = {}
return locations

@metrics_noop
def add(
self,
Expand All @@ -409,6 +455,7 @@ def add(
unit, # type: MeasurementUnit
tags, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
Expand Down Expand Up @@ -441,6 +488,24 @@ def add(

self._buckets_total_weight += metric.weight - previous_weight

# Store code location once per metric and per day (of bucket timestamp)
if self._enable_code_locations:
meta_key = (ty, key, unit)
start_of_day = utc_from_timestamp(timestamp).replace(
hour=0, minute=0, second=0, microsecond=0, tzinfo=None
)
start_of_day = int(to_timestamp(start_of_day))

if (start_of_day, meta_key) not in self._seen_locations:
self._seen_locations.add((start_of_day, meta_key))
loc = get_code_location(stacklevel)
if loc is not None:
# Group metadata by day to make flushing more efficient.
# There needs to be one envelope item per timestamp.
self._pending_locations.setdefault(start_of_day, []).append(
(meta_key, loc)
)

# Given the new weight we consider whether we want to force flush.
self._consider_force_flush()

Expand Down Expand Up @@ -471,13 +536,23 @@ def _consider_force_flush(self):
def _emit(
self,
flushable_buckets, # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
code_locations, # type: Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
):
# type: (...) -> Envelope
encoded_metrics = _encode_metrics(flushable_buckets)
metric_item = Item(payload=encoded_metrics, type="statsd")
envelope = Envelope(items=[metric_item])
self._capture_func(envelope)
return envelope
# type: (...) -> Optional[Envelope]
envelope = Envelope()

if flushable_buckets:
encoded_metrics = _encode_metrics(flushable_buckets)
envelope.add_item(Item(payload=encoded_metrics, type="statsd"))

for timestamp, locations in iteritems(code_locations):
encoded_locations = _encode_locations(timestamp, locations)
envelope.add_item(Item(payload=encoded_locations, type="metric_meta"))

if envelope.items:
self._capture_func(envelope)
return envelope
return None

def _serialize_tags(
self, tags # type: Optional[MetricTags]
Expand All @@ -487,7 +562,7 @@ def _serialize_tags(
return ()

rv = []
for key, value in tags.items():
for key, value in iteritems(tags):
# If the value is a collection, we want to flatten it.
if isinstance(value, (list, tuple)):
for inner_value in value:
Expand Down Expand Up @@ -536,12 +611,13 @@ def incr(
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Increments a counter."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("c", key, value, unit, tags, timestamp)
aggregator.add("c", key, value, unit, tags, timestamp, stacklevel)


class _Timing(object):
Expand All @@ -552,6 +628,7 @@ def __init__(
timestamp, # type: Optional[Union[float, datetime]]
value, # type: Optional[float]
unit, # type: DurationUnit
stacklevel, # type: int
):
# type: (...) -> None
self.key = key
Expand All @@ -560,6 +637,7 @@ def __init__(
self.value = value
self.unit = unit
self.entered = None # type: Optional[float]
self.stacklevel = stacklevel

def _validate_invocation(self, context):
# type: (str) -> None
Expand All @@ -579,7 +657,9 @@ def __exit__(self, exc_type, exc_value, tb):
aggregator, tags = _get_aggregator_and_update_tags(self.key, self.tags)
if aggregator is not None:
elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore
aggregator.add("d", self.key, elapsed, self.unit, tags, self.timestamp)
aggregator.add(
"d", self.key, elapsed, self.unit, tags, self.timestamp, self.stacklevel
)

def __call__(self, f):
# type: (Any) -> Any
Expand All @@ -589,7 +669,11 @@ def __call__(self, f):
def timed_func(*args, **kwargs):
# type: (*Any, **Any) -> Any
with timing(
key=self.key, tags=self.tags, timestamp=self.timestamp, unit=self.unit
key=self.key,
tags=self.tags,
timestamp=self.timestamp,
unit=self.unit,
stacklevel=self.stacklevel + 1,
):
return f(*args, **kwargs)

Expand All @@ -602,6 +686,7 @@ def timing(
unit="second", # type: DurationUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> _Timing
"""Emits a distribution with the time it takes to run the given code block.
Expand All @@ -615,8 +700,8 @@ def timing(
if value is not None:
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp)
return _Timing(key, tags, timestamp, value, unit)
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
return _Timing(key, tags, timestamp, value, unit, stacklevel)


def distribution(
Expand All @@ -625,12 +710,13 @@ def distribution(
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a distribution."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp)
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)


def set(
Expand All @@ -639,12 +725,13 @@ def set(
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a set."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("s", key, value, unit, tags, timestamp)
aggregator.add("s", key, value, unit, tags, timestamp, stacklevel)


def gauge(
Expand All @@ -653,9 +740,10 @@ def gauge(
unit="none", # type: MetricValue
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a gauge."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("g", key, value, unit, tags, timestamp)
aggregator.add("g", key, value, unit, tags, timestamp, stacklevel)

0 comments on commit a67914c

Please sign in to comment.