Skip to content

Commit

Permalink
Add exemplar support to _count (#3996)
Browse files Browse the repository at this point in the history
Prometheus did not support exemplars for Summaries and it did not
support exemplars for anything other than buckets on Histograms.
Since Prometheus now supports exemplars on every time series,
we can add exemplars to Summary/Histogram _count, see:
prometheus/prometheus#11982
  • Loading branch information
jonatan-ivanov committed Aug 15, 2023
1 parent b5b0a75 commit 1354fc2
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package io.micrometer.prometheus;

import io.micrometer.common.lang.NonNull;
import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.distribution.*;
import io.prometheus.client.exemplars.CounterExemplarSampler;
import io.prometheus.client.exemplars.Exemplar;
import io.prometheus.client.exemplars.HistogramExemplarSampler;
import io.prometheus.client.exemplars.ExemplarSampler;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -47,10 +50,16 @@ public class PrometheusDistributionSummary extends AbstractDistributionSummary {
@Nullable
private final Histogram histogram;

private boolean exemplarsEnabled = false;
@Nullable
private final ExemplarSampler exemplarSampler;

@Nullable
private final AtomicReference<Exemplar> lastExemplar;

private boolean histogramExemplarsEnabled = false;

PrometheusDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, HistogramFlavor histogramFlavor, @Nullable HistogramExemplarSampler exemplarSampler) {
double scale, HistogramFlavor histogramFlavor, @Nullable ExemplarSampler exemplarSampler) {
super(id, clock,
DistributionStatisticConfig.builder()
.percentilesHistogram(false)
Expand All @@ -68,7 +77,7 @@ public class PrometheusDistributionSummary extends AbstractDistributionSummary {
PrometheusHistogram prometheusHistogram = new PrometheusHistogram(clock,
distributionStatisticConfig, exemplarSampler);
this.histogram = prometheusHistogram;
this.exemplarsEnabled = prometheusHistogram.isExemplarsEnabled();
this.histogramExemplarsEnabled = prometheusHistogram.isExemplarsEnabled();
break;
case VictoriaMetrics:
this.histogram = new FixedBoundaryVictoriaMetricsHistogram();
Expand All @@ -81,6 +90,15 @@ public class PrometheusDistributionSummary extends AbstractDistributionSummary {
else {
this.histogram = null;
}

if (!this.histogramExemplarsEnabled && exemplarSampler != null) {
this.exemplarSampler = exemplarSampler;
this.lastExemplar = new AtomicReference<>();
}
else {
this.exemplarSampler = null;
this.lastExemplar = null;
}
}

@Override
Expand All @@ -89,20 +107,45 @@ protected void recordNonNegative(double amount) {
this.amount.add(amount);
max.record(amount);

if (histogram != null)
if (histogram != null) {
histogram.recordDouble(amount);
}
if (!histogramExemplarsEnabled && exemplarSampler != null) {
updateLastExemplar(amount, exemplarSampler);
}
}

// Similar to exemplar.updateAndGet(...) but it does nothing if the next value is null
private void updateLastExemplar(double amount, @NonNull CounterExemplarSampler exemplarSampler) {
Exemplar prev;
Exemplar next;
do {
prev = lastExemplar.get();
next = exemplarSampler.sample(amount, prev);
}
while (next != null && next != prev && !lastExemplar.compareAndSet(prev, next));
}

@Nullable
Exemplar[] exemplars() {
if (exemplarsEnabled) {
Exemplar[] histogramExemplars() {
if (histogramExemplarsEnabled) {
return ((PrometheusHistogram) histogram).exemplars();
}
else {
return null;
}
}

@Nullable
Exemplar lastExemplar() {
if (histogramExemplarsEnabled) {
return ((PrometheusHistogram) histogram).lastExemplar();
}
else {
return lastExemplar != null ? lastExemplar.get() : null;
}
}

@Override
public long count() {
return count.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand All @@ -41,20 +42,23 @@
*/
class PrometheusHistogram extends TimeWindowFixedBoundaryHistogram {

@Nullable
private final double[] buckets;

@Nullable
private final AtomicReferenceArray<Exemplar> exemplars;

@Nullable
private final AtomicReference<Exemplar> lastExemplar;

@Nullable
private final HistogramExemplarSampler exemplarSampler;

PrometheusHistogram(Clock clock, DistributionStatisticConfig config,
@Nullable HistogramExemplarSampler exemplarSampler) {
super(clock, DistributionStatisticConfig.builder()
.expiry(Duration.ofDays(1825)) // effectively
// never
// roll
// over
// effectively never rolls over
.expiry(Duration.ofDays(1825))
.bufferLength(1)
.build()
.merge(config), true);
Expand All @@ -70,10 +74,12 @@ class PrometheusHistogram extends TimeWindowFixedBoundaryHistogram {
this.buckets = originalBuckets;
}
this.exemplars = new AtomicReferenceArray<>(this.buckets.length);
this.lastExemplar = new AtomicReference<>();
}
else {
this.buckets = null;
this.exemplars = null;
this.lastExemplar = null;
}
}

Expand Down Expand Up @@ -107,16 +113,20 @@ private void updateExemplar(double value, @Nullable TimeUnit sourceUnit, @Nullab
int index) {
double bucketFrom = (index == 0) ? Double.NEGATIVE_INFINITY : buckets[index - 1];
double bucketTo = buckets[index];
Exemplar prev;
Exemplar next;
Exemplar previusBucketExemplar;
Exemplar previousLastExemplar;
Exemplar nextExemplar;

double exemplarValue = (sourceUnit != null && destinationUnit != null)
? TimeUtils.convert(value, sourceUnit, destinationUnit) : value;
do {
prev = exemplars.get(index);
next = exemplarSampler.sample(exemplarValue, bucketFrom, bucketTo, prev);
previusBucketExemplar = exemplars.get(index);
previousLastExemplar = lastExemplar.get();
nextExemplar = exemplarSampler.sample(exemplarValue, bucketFrom, bucketTo, previusBucketExemplar);
}
while (next != null && next != prev && !exemplars.compareAndSet(index, prev, next));
while (nextExemplar != null && nextExemplar != previusBucketExemplar
&& !(exemplars.compareAndSet(index, previusBucketExemplar, nextExemplar)
&& lastExemplar.compareAndSet(previousLastExemplar, nextExemplar)));
}

@Nullable
Expand All @@ -134,6 +144,11 @@ Exemplar[] exemplars() {
}
}

@Nullable
Exemplar lastExemplar() {
return this.lastExemplar.get();
}

/**
* The least bucket that is less than or equal to a sample.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public DistributionSummary newDistributionSummary(Meter.Id id,
}
}

Exemplar lastExemplar = summary.lastExemplar();
Collector.Type type = Collector.Type.SUMMARY;
if (histogramCounts.length > 0) {
// Prometheus doesn't balk at a metric being BOTH a histogram and a
Expand All @@ -244,7 +245,7 @@ public DistributionSummary newDistributionSummary(Meter.Id id,
case Prometheus:
histogramKeys.add("le");

Exemplar[] exemplars = summary.exemplars();
Exemplar[] exemplars = summary.histogramExemplars();

// satisfies
// https://prometheus.io/docs/concepts/metric_types/#histogram
Expand Down Expand Up @@ -295,8 +296,14 @@ public DistributionSummary newDistributionSummary(Meter.Id id,

}

samples.add(
new Collector.MetricFamilySamples.Sample(conventionName + "_count", tagKeys, tagValues, count));
if (lastExemplar == null) {
samples.add(new Collector.MetricFamilySamples.Sample(conventionName + "_count", tagKeys, tagValues,
count));
}
else {
samples.add(new Collector.MetricFamilySamples.Sample(conventionName + "_count", tagKeys, tagValues,
count, copyExemplarWithNewValue(1.0, lastExemplar)));
}

samples.add(new Collector.MetricFamilySamples.Sample(conventionName + "_sum", tagKeys, tagValues,
summary.totalAmount()));
Expand All @@ -316,7 +323,7 @@ protected io.micrometer.core.instrument.Timer newTimer(Meter.Id id,
PrometheusTimer timer = new PrometheusTimer(id, clock, distributionStatisticConfig, pauseDetector,
prometheusConfig.histogramFlavor(), exemplarSampler);
applyToCollector(id, (collector) -> addDistributionStatisticSamples(distributionStatisticConfig, collector,
timer, timer::exemplars, tagValues(id), false));
timer, timer::lastExemplar, timer::histogramExemplars, tagValues(id), false));
return timer;
}

Expand All @@ -338,7 +345,7 @@ protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfi
LongTaskTimer ltt = new CumulativeHistogramLongTaskTimer(id, clock, getBaseTimeUnit(),
distributionStatisticConfig);
applyToCollector(id, (collector) -> addDistributionStatisticSamples(distributionStatisticConfig, collector, ltt,
() -> null, tagValues(id), true));
() -> null, () -> null, tagValues(id), true));
return ltt;
}

Expand Down Expand Up @@ -440,8 +447,8 @@ public CollectorRegistry getPrometheusRegistry() {
}

private void addDistributionStatisticSamples(DistributionStatisticConfig distributionStatisticConfig,
MicrometerCollector collector, HistogramSupport histogramSupport, Supplier<Exemplar[]> exemplarsSupplier,
List<String> tagValues, boolean forLongTaskTimer) {
MicrometerCollector collector, HistogramSupport histogramSupport, Supplier<Exemplar> lastExemplarSupplier,
Supplier<Exemplar[]> histogramExemplarsSupplier, List<String> tagValues, boolean forLongTaskTimer) {
collector.add(tagValues, (conventionName, tagKeys) -> {
Stream.Builder<Collector.MetricFamilySamples.Sample> samples = Stream.builder();

Expand All @@ -463,6 +470,7 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib
}
}

Exemplar lastExemplar = lastExemplarSupplier.get();
Collector.Type type = distributionStatisticConfig.isPublishingHistogram() ? Collector.Type.HISTOGRAM
: Collector.Type.SUMMARY;
if (histogramCounts.length > 0) {
Expand All @@ -477,7 +485,7 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib
case Prometheus:
histogramKeys.add("le");

Exemplar[] exemplars = exemplarsSupplier.get();
Exemplar[] exemplars = histogramExemplarsSupplier.get();

// satisfies
// https://prometheus.io/docs/concepts/metric_types/#histogram
Expand Down Expand Up @@ -523,8 +531,15 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib

}

samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_active_count" : "_count"), tagKeys, tagValues, count));
if (lastExemplar == null) {
samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_active_count" : "_count"), tagKeys, tagValues, count));
}
else {
samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_active_count" : "_count"), tagKeys, tagValues, count,
copyExemplarWithNewValue(1.0, lastExemplar)));
}

samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_duration_sum" : "_sum"), tagKeys, tagValues,
Expand All @@ -537,6 +552,15 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib
});
}

private Exemplar copyExemplarWithNewValue(double newValue, Exemplar exemplar) {
String[] labels = new String[exemplar.getNumberOfLabels() * 2];
for (int i = 0; i < exemplar.getNumberOfLabels(); i++) {
labels[2 * i] = exemplar.getLabelName(i);
labels[2 * i + 1] = exemplar.getLabelValue(i);
}
return new Exemplar(newValue, exemplar.getTimestampMs(), labels);
}

private void onMeterRemoved(Meter meter) {
MicrometerCollector collector = collectorMap.get(getConventionName(meter.getId()));
if (collector != null) {
Expand Down

0 comments on commit 1354fc2

Please sign in to comment.