Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metric span summaries #2522

Merged
merged 16 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
"transport_zlib_compression_level": Optional[int],
"transport_num_pools": Optional[int],
"enable_metrics": Optional[bool],
"metrics_summary_sample_rate": Optional[float],
"should_summarize_metric": Optional[Callable[[str, MetricTags], bool]],
"before_emit_metric": Optional[Callable[[str, MetricTags], bool]],
"metric_code_locations": Optional[bool],
},
Expand Down
212 changes: 167 additions & 45 deletions sentry_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,58 @@
}


class LocalAggregator(object):
__slots__ = ("_measurements",)

def __init__(self):
# type: (...) -> None
self._measurements = (
{}
) # type: Dict[Tuple[str, MetricTagsInternal], Tuple[float, float, int, float]]

def add(
self,
ty, # type: MetricType
key, # type: str
value, # type: float
unit, # type: MeasurementUnit
tags, # type: MetricTagsInternal
):
# type: (...) -> None
export_key = "%s:%s@%s" % (ty, key, unit)
bucket_key = (export_key, tags)

old = self._measurements.get(bucket_key)
if old is not None:
v_min, v_max, v_count, v_sum = old
v_min = min(v_min, value)
v_max = max(v_max, value)
v_count += 1
v_sum += value
else:
v_min = v_max = v_sum = value
v_count = 1
self._measurements[bucket_key] = (v_min, v_max, v_count, v_sum)

def to_json(self):
# type: (...) -> Dict[str, Any]
rv = {}
for (export_key, tags), (
v_min,
v_max,
v_count,
v_sum,
) in self._measurements.items():
rv[export_key] = {
"tags": _tags_to_dict(tags),
"min": v_min,
"max": v_max,
"count": v_count,
"sum": v_sum,
}
return rv


class MetricsAggregator(object):
ROLLUP_IN_SECONDS = 10.0
MAX_WEIGHT = 100000
Expand Down Expand Up @@ -455,11 +507,12 @@
unit, # type: MeasurementUnit
tags, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
local_aggregator=None, # type: Optional[LocalAggregator]
stacklevel=0, # type: int
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
return
return None

Check warning on line 515 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L515

Added line #L515 was not covered by tests

if timestamp is None:
timestamp = time.time()
Expand All @@ -469,11 +522,12 @@
bucket_timestamp = int(
(timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS
)
serialized_tags = _serialize_tags(tags)
bucket_key = (
ty,
key,
unit,
self._serialize_tags(tags),
serialized_tags,
)

with self._lock:
Expand All @@ -486,7 +540,8 @@
metric = local_buckets[bucket_key] = METRIC_TYPES[ty](value)
previous_weight = 0

self._buckets_total_weight += metric.weight - previous_weight
added = metric.weight - previous_weight
self._buckets_total_weight += added

# Store code location once per metric and per day (of bucket timestamp)
if self._enable_code_locations:
Expand All @@ -509,6 +564,10 @@
# Given the new weight we consider whether we want to force flush.
self._consider_force_flush()

if local_aggregator is not None:
local_value = float(added if ty == "s" else value)
local_aggregator.add(ty, key, local_value, unit, serialized_tags)

def kill(self):
# type: (...) -> None
if self._flusher is None:
Expand Down Expand Up @@ -554,55 +613,87 @@
return envelope
return None

def _serialize_tags(
self, tags # type: Optional[MetricTags]
):
# type: (...) -> MetricTagsInternal
if not tags:
return ()

rv = []
for key, value in iteritems(tags):
# If the value is a collection, we want to flatten it.
if isinstance(value, (list, tuple)):
for inner_value in value:
if inner_value is not None:
rv.append((key, text_type(inner_value)))
elif value is not None:
rv.append((key, text_type(value)))

# It's very important to sort the tags in order to obtain the
# same bucket key.
return tuple(sorted(rv))
def _serialize_tags(
tags, # type: Optional[MetricTags]
):
# type: (...) -> MetricTagsInternal
if not tags:
return ()

Check warning on line 622 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L622

Added line #L622 was not covered by tests

rv = []
for key, value in iteritems(tags):
# If the value is a collection, we want to flatten it.
if isinstance(value, (list, tuple)):
for inner_value in value:
if inner_value is not None:
rv.append((key, text_type(inner_value)))
elif value is not None:
rv.append((key, text_type(value)))

# It's very important to sort the tags in order to obtain the
# same bucket key.
return tuple(sorted(rv))


def _tags_to_dict(tags):
# type: (MetricTagsInternal) -> Dict[str, Any]
rv = {} # type: Dict[str, Any]
for tag_name, tag_value in tags:
old_value = rv.get(tag_name)
if old_value is not None:
if isinstance(old_value, list):
old_value.append(tag_value)

Check warning on line 646 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L646

Added line #L646 was not covered by tests
else:
rv[tag_name] = [old_value, tag_value]

Check warning on line 648 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L648

Added line #L648 was not covered by tests
else:
rv[tag_name] = tag_value
return rv


def _get_aggregator_and_update_tags(key, tags):
# type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[MetricTags]]
# type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[LocalAggregator], Optional[MetricTags]]
"""Returns the current metrics aggregator if there is one."""
hub = sentry_sdk.Hub.current
client = hub.client
if client is None or client.metrics_aggregator is None:
return None, tags
return None, None, tags

experiments = client.options.get("_experiments", {})

updated_tags = dict(tags or ()) # type: Dict[str, MetricTagValue]
updated_tags.setdefault("release", client.options["release"])
updated_tags.setdefault("environment", client.options["environment"])

scope = hub.scope
local_aggregator = None

# We go with the low-level API here to access transaction information as
# this one is the same between just errors and errors + performance
transaction_source = scope._transaction_info.get("source")
if transaction_source in GOOD_TRANSACTION_SOURCES:
transaction = scope._transaction
if transaction:
updated_tags.setdefault("transaction", transaction)
transaction_name = scope._transaction
if transaction_name:
updated_tags.setdefault("transaction", transaction_name)
if scope._span is not None:
sample_rate = experiments.get("metrics_summary_sample_rate") or 0.0
should_summarize_metric_callback = experiments.get(
"should_summarize_metric"
)
if random.random() < sample_rate and (
should_summarize_metric_callback is None
or should_summarize_metric_callback(key, updated_tags)
):
local_aggregator = scope._span._get_local_aggregator()

callback = client.options.get("_experiments", {}).get("before_emit_metric")
if callback is not None:
before_emit_callback = experiments.get("before_emit_metric")
if before_emit_callback is not None:
with recursion_protection() as in_metrics:
if not in_metrics:
if not callback(key, updated_tags):
return None, updated_tags
if not before_emit_callback(key, updated_tags):
return None, None, updated_tags

return client.metrics_aggregator, updated_tags
return client.metrics_aggregator, local_aggregator, updated_tags


def incr(
Expand All @@ -615,9 +706,11 @@
):
# type: (...) -> None
"""Increments a counter."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("c", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"c", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)


class _Timing(object):
Expand All @@ -637,6 +730,7 @@
self.value = value
self.unit = unit
self.entered = None # type: Optional[float]
self._span = None # type: Optional[sentry_sdk.tracing.Span]
self.stacklevel = stacklevel

def _validate_invocation(self, context):
Expand All @@ -650,17 +744,37 @@
# type: (...) -> _Timing
self.entered = TIMING_FUNCTIONS[self.unit]()
self._validate_invocation("context-manager")
self._span = sentry_sdk.start_span(op="metric.timing", description=self.key)
if self.tags:
for key, value in self.tags.items():
if isinstance(value, (tuple, list)):
value = ",".join(sorted(map(str, value)))

Check warning on line 751 in sentry_sdk/metrics.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/metrics.py#L751

Added line #L751 was not covered by tests
self._span.set_tag(key, value)
self._span.__enter__()
return self

def __exit__(self, exc_type, exc_value, tb):
# type: (Any, Any, Any) -> None
aggregator, tags = _get_aggregator_and_update_tags(self.key, self.tags)
assert self._span, "did not enter"
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
self.key, self.tags
)
if aggregator is not None:
elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore
aggregator.add(
"d", self.key, elapsed, self.unit, tags, self.timestamp, self.stacklevel
"d",
self.key,
elapsed,
self.unit,
tags,
self.timestamp,
local_aggregator,
self.stacklevel,
)

self._span.__exit__(exc_type, exc_value, tb)
self._span = None

def __call__(self, f):
# type: (Any) -> Any
self._validate_invocation("decorator")
Expand Down Expand Up @@ -698,9 +812,11 @@
- it can be used as a decorator
"""
if value is not None:
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)
return _Timing(key, tags, timestamp, value, unit, stacklevel)


Expand All @@ -714,9 +830,11 @@
):
# type: (...) -> None
"""Emits a distribution."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)


def set(
Expand All @@ -729,21 +847,25 @@
):
# type: (...) -> None
"""Emits a set."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("s", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"s", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)


def gauge(
key, # type: str
value, # type: float
unit="none", # type: MetricValue
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a gauge."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("g", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"g", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)