Skip to content

Commit

Permalink
support exponential histograms in the prometheus bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Feb 14, 2024
1 parent 72f859c commit b04ab2e
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ The next release will require at least [Go 1.21].

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
- Support [Go 1.22]. (#5082)
- Support Exponential (native) Histograms in `go.opentelemetry.io/contrib/bridges/prometheus`. (#5093)

### Removed

Expand Down
8 changes: 6 additions & 2 deletions bridges/prometheus/doc.go
Expand Up @@ -18,10 +18,14 @@
// with the OpenTelemetry SDK. This enables prometheus instrumentation libraries
// to be used with OpenTelemetry exporters, including OTLP.
//
// Prometheus histograms are translated to OpenTelemetry exponential histograms
// when native histograms are enabled in the Prometheus client. To enable
// Prometheus native histograms, set the (currently experimental) NativeHistogram...
// options of the prometheus [HistogramOpts] when creating prometheus histograms.
//
// Limitations:
// - Summary metrics are dropped by the bridge.
// - Prometheus histograms are translated to OpenTelemetry fixed-bucket
// histograms, rather than exponential histograms.
//
// [Prometheus Golang client library]: https://github.com/prometheus/client_golang
// [HistogramOpts]: https://github.com/prometheus/client_golang/blob/7ac90362b02729a65109b33d172bafb65d7dab50/prometheus/histogram.go#L359
package prometheus // import "go.opentelemetry.io/contrib/bridges/prometheus"
105 changes: 103 additions & 2 deletions bridges/prometheus/producer.go
Expand Up @@ -90,6 +90,10 @@ func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily, now time.Time
var errs multierr
otelMetrics := make([]metricdata.Metrics, 0)
for _, pm := range promMetrics {
if len(pm.GetMetric()) == 0 {
// Skip the metric if there are no observations
continue

Check warning on line 95 in bridges/prometheus/producer.go

View check run for this annotation

Codecov / codecov/patch

bridges/prometheus/producer.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}
newMetric := metricdata.Metrics{
Name: pm.GetName(),
Description: pm.GetHelp(),
Expand All @@ -100,7 +104,11 @@ func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily, now time.Time
case dto.MetricType_COUNTER:
newMetric.Data = convertCounter(pm.GetMetric(), now)
case dto.MetricType_HISTOGRAM:
newMetric.Data = convertHistogram(pm.GetMetric(), now)
if isExponentialHistogram(pm.GetMetric()[0].GetHistogram()) {
newMetric.Data = convertExponentialHistogram(pm.GetMetric(), now)
} else {
newMetric.Data = convertHistogram(pm.GetMetric(), now)
}
default:
// MetricType_GAUGE_HISTOGRAM, MetricType_SUMMARY, MetricType_UNTYPED
errs = append(errs, fmt.Errorf("%w: %v for metric %v", errUnsupportedType, pm.GetType(), pm.GetName()))
Expand All @@ -111,6 +119,16 @@ func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily, now time.Time
return otelMetrics, errs.errOrNil()
}

func isExponentialHistogram(hist *dto.Histogram) bool {
// The prometheus go client ensures at least one of these is non-zero
// so it can be distinguished from a fixed-bucket histogram.
// https://github.com/prometheus/client_golang/blob/7ac90362b02729a65109b33d172bafb65d7dab50/prometheus/histogram.go#L818
return hist.GetZeroThreshold() > 0 ||
hist.GetZeroCount() > 0 ||
len(hist.GetPositiveSpan()) > 0 ||
len(hist.GetNegativeSpan()) > 0
}

func convertGauge(metrics []*dto.Metric, now time.Time) metricdata.Gauge[float64] {
otelGauge := metricdata.Gauge[float64]{
DataPoints: make([]metricdata.DataPoint[float64], len(metrics)),
Expand Down Expand Up @@ -155,8 +173,88 @@ func convertCounter(metrics []*dto.Metric, now time.Time) metricdata.Sum[float64
return otelCounter
}

func convertExponentialHistogram(metrics []*dto.Metric, now time.Time) metricdata.ExponentialHistogram[float64] {
otelExpHistogram := metricdata.ExponentialHistogram[float64]{
DataPoints: make([]metricdata.ExponentialHistogramDataPoint[float64], len(metrics)),
Temporality: metricdata.CumulativeTemporality,
}
for i, m := range metrics {
dp := metricdata.ExponentialHistogramDataPoint[float64]{
Attributes: convertLabels(m.GetLabel()),
StartTime: processStartTime,
Time: now,
Count: m.GetHistogram().GetSampleCount(),
Sum: m.GetHistogram().GetSampleSum(),
Scale: m.GetHistogram().GetSchema(),
ZeroCount: m.GetHistogram().GetZeroCount(),
ZeroThreshold: m.GetHistogram().GetZeroThreshold(),
PositiveBucket: convertExponentialBuckets(
m.GetHistogram().GetPositiveSpan(),
m.GetHistogram().GetPositiveDelta(),
),
NegativeBucket: convertExponentialBuckets(
m.GetHistogram().GetNegativeSpan(),
m.GetHistogram().GetNegativeDelta(),
),
// TODO: Support exemplars
}
createdTs := m.GetHistogram().GetCreatedTimestamp()
if createdTs.IsValid() {
dp.StartTime = createdTs.AsTime()
}
if m.GetTimestampMs() != 0 {
dp.Time = time.UnixMilli(m.GetTimestampMs())
}

Check warning on line 207 in bridges/prometheus/producer.go

View check run for this annotation

Codecov / codecov/patch

bridges/prometheus/producer.go#L206-L207

Added lines #L206 - L207 were not covered by tests
otelExpHistogram.DataPoints[i] = dp
}
return otelExpHistogram
}

func convertExponentialBuckets(bucketSpans []*dto.BucketSpan, deltas []int64) metricdata.ExponentialBucket {
if len(bucketSpans) == 0 {
return metricdata.ExponentialBucket{}
}
// Prometheus Native Histograms buckets are indexed by upper boundary
// while Exponential Histograms are indexed by lower boundary, the result
// being that the Offset fields are different-by-one.
initialOffset := bucketSpans[0].GetOffset() - 1
// We will have one bucket count for each delta, and zeros for the offsets
// after the initial offset.
lenCounts := int32(len(deltas))
for i, bs := range bucketSpans {
if i != 0 {
lenCounts += bs.GetOffset()
}
}
counts := make([]uint64, lenCounts)
deltaIndex := 0
countIndex := int32(0)
count := int64(0)
for i, bs := range bucketSpans {
// Do not insert zeroes if this is the first bucketSpan, since those
// zeroes are accounted for in the Offset field.
if i != 0 {
// Increase the count index by the Offset to insert Offset zeroes
countIndex += bs.GetOffset()
}
for j := uint32(0); j < bs.GetLength(); j++ {
// Convert deltas to the cumulative number of observations
count += deltas[deltaIndex]
deltaIndex++
// count should always be positive after accounting for deltas
if count > 0 {
counts[countIndex] = uint64(count)
}
countIndex++
}
}
return metricdata.ExponentialBucket{
Offset: initialOffset,
Counts: counts,
}
}

func convertHistogram(metrics []*dto.Metric, now time.Time) metricdata.Histogram[float64] {
// TODO: support converting Prometheus "native" histograms to OTel exponential histograms.
otelHistogram := metricdata.Histogram[float64]{
DataPoints: make([]metricdata.HistogramDataPoint[float64], len(metrics)),
Temporality: metricdata.CumulativeTemporality,
Expand Down Expand Up @@ -186,6 +284,9 @@ func convertHistogram(metrics []*dto.Metric, now time.Time) metricdata.Histogram
}

func convertBuckets(buckets []*dto.Bucket) ([]float64, []uint64, []metricdata.Exemplar[float64]) {
if len(buckets) == 0 {
return nil, nil, nil
}

Check warning on line 289 in bridges/prometheus/producer.go

View check run for this annotation

Codecov / codecov/patch

bridges/prometheus/producer.go#L288-L289

Added lines #L288 - L289 were not covered by tests
bounds := make([]float64, len(buckets)-1)
bucketCounts := make([]uint64, len(buckets))
exemplars := make([]metricdata.Exemplar[float64], 0)
Expand Down
78 changes: 78 additions & 0 deletions bridges/prometheus/producer_test.go
Expand Up @@ -195,6 +195,60 @@ func TestProduce(t *testing.T) {
},
}},
},
{
name: "exponential histogram",
testFn: func(reg *prometheus.Registry) {
metric := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "test_exponential_histogram_metric",
Help: "An exponential histogram metric for testing",
// This enables collection of native histograms in the prometheus client.
NativeHistogramBucketFactor: 1.5,
ConstLabels: prometheus.Labels(map[string]string{
"foo": "bar",
}),
})
reg.MustRegister(metric)
metric.Observe(78.3)
metric.Observe(2.3)
metric.Observe(2.3)
metric.Observe(.5)
metric.Observe(-78.3)
metric.Observe(-.15)
metric.Observe(0.0)
},
expected: []metricdata.ScopeMetrics{{
Scope: instrumentation.Scope{
Name: scopeName,
},
Metrics: []metricdata.Metrics{
{
Name: "test_exponential_histogram_metric",
Description: "An exponential histogram metric for testing",
Data: metricdata.ExponentialHistogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{
{
Count: 7,
Sum: 4.949999999999994,
Scale: 1,
ZeroCount: 1,
PositiveBucket: metricdata.ExponentialBucket{
Offset: -3,
Counts: []uint64{1, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
},
NegativeBucket: metricdata.ExponentialBucket{
Offset: -6,
Counts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
},
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
ZeroThreshold: prometheus.DefNativeHistogramZeroThreshold,
},
},
},
},
},
}},
},
{
name: "partial success",
testFn: func(reg *prometheus.Registry) {
Expand Down Expand Up @@ -315,6 +369,30 @@ func TestProduceForStartTime(t *testing.T) {
return sts
},
},
{
name: "exponential histogram",
testFn: func(reg *prometheus.Registry) {
metric := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "test_exponential_histogram_metric",
Help: "An exponential histogram metric for testing",
// This enables collection of native histograms in the prometheus client.
NativeHistogramBucketFactor: 1.5,
ConstLabels: prometheus.Labels(map[string]string{
"foo": "bar",
}),
})
reg.MustRegister(metric)
metric.Observe(78.3)
},
startTimeFn: func(aggr metricdata.Aggregation) []time.Time {
dps := aggr.(metricdata.ExponentialHistogram[float64]).DataPoints
sts := make([]time.Time, len(dps))
for i, dp := range dps {
sts[i] = dp.StartTime
}
return sts
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit b04ab2e

Please sign in to comment.