Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exemplar support to _count #3996

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both in PrometheusDistributionSummary and in PrometheusTimer, the last sampled exemplars can be recorded in two ways:

  1. PrometheusHistogram records it so that the exemplar will not be sampled twice (one sample call in PrometheusHistogram): Histogram use-case (there are buckets + _count)
  2. If that is not available, PrometheusDistributionSummary/PrometheusTimer record it directly: Summary use-case (no buckets, just _count)

Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package io.micrometer.prometheus;

import io.micrometer.common.lang.NonNull;
import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.distribution.*;
import io.prometheus.client.exemplars.CounterExemplarSampler;
import io.prometheus.client.exemplars.Exemplar;
import io.prometheus.client.exemplars.HistogramExemplarSampler;
import io.prometheus.client.exemplars.ExemplarSampler;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -47,10 +50,16 @@ public class PrometheusDistributionSummary extends AbstractDistributionSummary {
@Nullable
private final Histogram histogram;

private boolean exemplarsEnabled = false;
@Nullable
private final ExemplarSampler exemplarSampler;

@Nullable
private final AtomicReference<Exemplar> lastExemplar;

private boolean histogramExemplarsEnabled = false;

PrometheusDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, HistogramFlavor histogramFlavor, @Nullable HistogramExemplarSampler exemplarSampler) {
double scale, HistogramFlavor histogramFlavor, @Nullable ExemplarSampler exemplarSampler) {
super(id, clock,
DistributionStatisticConfig.builder()
.percentilesHistogram(false)
Expand All @@ -68,7 +77,7 @@ public class PrometheusDistributionSummary extends AbstractDistributionSummary {
PrometheusHistogram prometheusHistogram = new PrometheusHistogram(clock,
distributionStatisticConfig, exemplarSampler);
this.histogram = prometheusHistogram;
this.exemplarsEnabled = prometheusHistogram.isExemplarsEnabled();
this.histogramExemplarsEnabled = prometheusHistogram.isExemplarsEnabled();
break;
case VictoriaMetrics:
this.histogram = new FixedBoundaryVictoriaMetricsHistogram();
Expand All @@ -81,6 +90,15 @@ public class PrometheusDistributionSummary extends AbstractDistributionSummary {
else {
this.histogram = null;
}

if (!this.histogramExemplarsEnabled && exemplarSampler != null) {
this.exemplarSampler = exemplarSampler;
this.lastExemplar = new AtomicReference<>();
}
else {
this.exemplarSampler = null;
this.lastExemplar = null;
}
}

@Override
Expand All @@ -89,20 +107,45 @@ protected void recordNonNegative(double amount) {
this.amount.add(amount);
max.record(amount);

if (histogram != null)
if (histogram != null) {
histogram.recordDouble(amount);
}
if (!histogramExemplarsEnabled && exemplarSampler != null) {
updateLastExemplar(amount, exemplarSampler);
}
}

// Similar to exemplar.updateAndGet(...) but it does nothing if the next value is null
private void updateLastExemplar(double amount, @NonNull CounterExemplarSampler exemplarSampler) {
Exemplar prev;
Exemplar next;
do {
prev = lastExemplar.get();
next = exemplarSampler.sample(amount, prev);
}
while (next != null && next != prev && !lastExemplar.compareAndSet(prev, next));
}

@Nullable
Exemplar[] exemplars() {
if (exemplarsEnabled) {
Exemplar[] histogramExemplars() {
if (histogramExemplarsEnabled) {
return ((PrometheusHistogram) histogram).exemplars();
}
else {
return null;
}
}

@Nullable
Exemplar lastExemplar() {
if (histogramExemplarsEnabled) {
return ((PrometheusHistogram) histogram).lastExemplar();
}
else {
return lastExemplar != null ? lastExemplar.get() : null;
}
}

@Override
public long count() {
return count.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand All @@ -41,20 +42,23 @@
*/
class PrometheusHistogram extends TimeWindowFixedBoundaryHistogram {

@Nullable
private final double[] buckets;

@Nullable
private final AtomicReferenceArray<Exemplar> exemplars;

@Nullable
private final AtomicReference<Exemplar> lastExemplar;

@Nullable
private final HistogramExemplarSampler exemplarSampler;

PrometheusHistogram(Clock clock, DistributionStatisticConfig config,
@Nullable HistogramExemplarSampler exemplarSampler) {
super(clock, DistributionStatisticConfig.builder()
.expiry(Duration.ofDays(1825)) // effectively
// never
// roll
// over
// effectively never rolls over
.expiry(Duration.ofDays(1825))
.bufferLength(1)
.build()
.merge(config), true);
Expand All @@ -70,10 +74,12 @@ class PrometheusHistogram extends TimeWindowFixedBoundaryHistogram {
this.buckets = originalBuckets;
}
this.exemplars = new AtomicReferenceArray<>(this.buckets.length);
this.lastExemplar = new AtomicReference<>();
}
else {
this.buckets = null;
this.exemplars = null;
this.lastExemplar = null;
}
}

Expand Down Expand Up @@ -107,16 +113,20 @@ private void updateExemplar(double value, @Nullable TimeUnit sourceUnit, @Nullab
int index) {
double bucketFrom = (index == 0) ? Double.NEGATIVE_INFINITY : buckets[index - 1];
double bucketTo = buckets[index];
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
Exemplar prev;
Exemplar next;
Exemplar previusBucketExemplar;
Exemplar previousLastExemplar;
Exemplar nextExemplar;

double exemplarValue = (sourceUnit != null && destinationUnit != null)
? TimeUtils.convert(value, sourceUnit, destinationUnit) : value;
do {
prev = exemplars.get(index);
next = exemplarSampler.sample(exemplarValue, bucketFrom, bucketTo, prev);
previusBucketExemplar = exemplars.get(index);
previousLastExemplar = lastExemplar.get();
nextExemplar = exemplarSampler.sample(exemplarValue, bucketFrom, bucketTo, previusBucketExemplar);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please notice that here and both in PrometheusDistributionSummary/PrometheusTimer the value of the last exemplar is the recorded value. I think this is expected, but the value for _count should not be the value that was recorded but (in these cases) 1.0.
Because of this the value is set in the registry (see my comment on it).

}
while (next != null && next != prev && !exemplars.compareAndSet(index, prev, next));
while (nextExemplar != null && nextExemplar != previusBucketExemplar
&& !(exemplars.compareAndSet(index, previusBucketExemplar, nextExemplar)
&& lastExemplar.compareAndSet(previousLastExemplar, nextExemplar)));
}

@Nullable
Expand All @@ -134,6 +144,11 @@ Exemplar[] exemplars() {
}
}

@Nullable
Exemplar lastExemplar() {
return this.lastExemplar.get();
}

/**
* The least bucket that is less than or equal to a sample.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public DistributionSummary newDistributionSummary(Meter.Id id,
}
}

Exemplar lastExemplar = summary.lastExemplar();
Collector.Type type = Collector.Type.SUMMARY;
if (histogramCounts.length > 0) {
// Prometheus doesn't balk at a metric being BOTH a histogram and a
Expand All @@ -244,7 +245,7 @@ public DistributionSummary newDistributionSummary(Meter.Id id,
case Prometheus:
histogramKeys.add("le");

Exemplar[] exemplars = summary.exemplars();
Exemplar[] exemplars = summary.histogramExemplars();

// satisfies
// https://prometheus.io/docs/concepts/metric_types/#histogram
Expand Down Expand Up @@ -295,8 +296,14 @@ public DistributionSummary newDistributionSummary(Meter.Id id,

}

samples.add(
new Collector.MetricFamilySamples.Sample(conventionName + "_count", tagKeys, tagValues, count));
if (lastExemplar == null) {
samples.add(new Collector.MetricFamilySamples.Sample(conventionName + "_count", tagKeys, tagValues,
count));
}
else {
samples.add(new Collector.MetricFamilySamples.Sample(conventionName + "_count", tagKeys, tagValues,
count, copyExemplarWithNewValue(1.0, lastExemplar)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm setting a new value for the Exemplar. This is needed because the value of the last recorded Exemplar is the last recorded (and sampled) value by PrometheusDistributionSummary/PrometheusTimer. That conforms the "contract" for being the value of the .lastExemplar() (which can be used elsewhere, i.e. in _sum) but for _count the value should be 1.0 since the counter is incremented with 1 in these cases.

}

samples.add(new Collector.MetricFamilySamples.Sample(conventionName + "_sum", tagKeys, tagValues,
summary.totalAmount()));
Expand All @@ -316,7 +323,7 @@ protected io.micrometer.core.instrument.Timer newTimer(Meter.Id id,
PrometheusTimer timer = new PrometheusTimer(id, clock, distributionStatisticConfig, pauseDetector,
prometheusConfig.histogramFlavor(), exemplarSampler);
applyToCollector(id, (collector) -> addDistributionStatisticSamples(distributionStatisticConfig, collector,
timer, timer::exemplars, tagValues(id), false));
timer, timer::lastExemplar, timer::histogramExemplars, tagValues(id), false));
return timer;
}

Expand All @@ -338,7 +345,7 @@ protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfi
LongTaskTimer ltt = new CumulativeHistogramLongTaskTimer(id, clock, getBaseTimeUnit(),
distributionStatisticConfig);
applyToCollector(id, (collector) -> addDistributionStatisticSamples(distributionStatisticConfig, collector, ltt,
() -> null, tagValues(id), true));
() -> null, () -> null, tagValues(id), true));
return ltt;
}

Expand Down Expand Up @@ -440,8 +447,8 @@ public CollectorRegistry getPrometheusRegistry() {
}

private void addDistributionStatisticSamples(DistributionStatisticConfig distributionStatisticConfig,
MicrometerCollector collector, HistogramSupport histogramSupport, Supplier<Exemplar[]> exemplarsSupplier,
List<String> tagValues, boolean forLongTaskTimer) {
MicrometerCollector collector, HistogramSupport histogramSupport, Supplier<Exemplar> lastExemplarSupplier,
Supplier<Exemplar[]> histogramExemplarsSupplier, List<String> tagValues, boolean forLongTaskTimer) {
collector.add(tagValues, (conventionName, tagKeys) -> {
Stream.Builder<Collector.MetricFamilySamples.Sample> samples = Stream.builder();

Expand All @@ -463,6 +470,7 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib
}
}

Exemplar lastExemplar = lastExemplarSupplier.get();
Collector.Type type = distributionStatisticConfig.isPublishingHistogram() ? Collector.Type.HISTOGRAM
: Collector.Type.SUMMARY;
if (histogramCounts.length > 0) {
Expand All @@ -477,7 +485,7 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib
case Prometheus:
histogramKeys.add("le");

Exemplar[] exemplars = exemplarsSupplier.get();
Exemplar[] exemplars = histogramExemplarsSupplier.get();

// satisfies
// https://prometheus.io/docs/concepts/metric_types/#histogram
Expand Down Expand Up @@ -523,8 +531,15 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib

}

samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_active_count" : "_count"), tagKeys, tagValues, count));
if (lastExemplar == null) {
samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_active_count" : "_count"), tagKeys, tagValues, count));
}
else {
samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_active_count" : "_count"), tagKeys, tagValues, count,
copyExemplarWithNewValue(1.0, lastExemplar)));
}

samples.add(new Collector.MetricFamilySamples.Sample(
conventionName + (forLongTaskTimer ? "_duration_sum" : "_sum"), tagKeys, tagValues,
Expand All @@ -537,6 +552,15 @@ private void addDistributionStatisticSamples(DistributionStatisticConfig distrib
});
}

private Exemplar copyExemplarWithNewValue(double newValue, Exemplar exemplar) {
String[] labels = new String[exemplar.getNumberOfLabels() * 2];
for (int i = 0; i < exemplar.getNumberOfLabels(); i++) {
labels[2 * i] = exemplar.getLabelName(i);
labels[2 * i + 1] = exemplar.getLabelValue(i);
}
return new Exemplar(newValue, exemplar.getTimestampMs(), labels);
}

private void onMeterRemoved(Meter meter) {
MicrometerCollector collector = collectorMap.get(getConventionName(meter.getId()));
if (collector != null) {
Expand Down