Skip to content

Commit

Permalink
implement WithExplicitBucketBoundaries option in the metric SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Oct 26, 2023
1 parent 0f5565a commit b6ed43a
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4660)
- Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4660)
- Add Summary, SummaryDataPoint, and QuantileValue to `go.opentelemetry.io/sdk/metric/metricdata`. (#4622)
- Add support for WithExplicitBucketBoundaries in `go.opentelemetry.io/otel/sdk/metric` (#4605)

### Deprecated

Expand Down
54 changes: 50 additions & 4 deletions sdk/metric/meter.go
Expand Up @@ -95,9 +95,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
// distribution of int64 measurements during a computational operation.
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
Expand Down Expand Up @@ -188,9 +187,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
// distribution of float64 measurements during a computational operation.
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
Expand Down Expand Up @@ -456,12 +454,36 @@ func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]ag
return p.int64Resolver.Aggregators(inst)
}

func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) {
boundaries := cfg.ExplicitBucketBoundaries()
aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
if aggError != nil {
// if boundaries are invalid, ignore them
boundaries = nil
}
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.int64Resolver.HistogramAggregators(inst, boundaries)
return measures, errors.Join(aggError, err)
}

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }

Expand All @@ -476,12 +498,36 @@ func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]
return p.float64Resolver.Aggregators(inst)
}

func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) {
boundaries := cfg.ExplicitBucketBoundaries()
aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
if aggError != nil {
// if boundaries are invalid, ignore them
boundaries = nil
}
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.float64Resolver.HistogramAggregators(inst, boundaries)
return measures, errors.Join(aggError, err)
}

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
Expand Down
83 changes: 83 additions & 0 deletions sdk/metric/meter_test.go
Expand Up @@ -16,6 +16,7 @@ package metric

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -550,6 +551,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) {

wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName),
},
{
name: "Int64Histogram with invalid buckets",

fn: func(t *testing.T, m metric.Meter) error {
i, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5))
assert.NotNil(t, i)
return err
},

wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})),
},
{
name: "Int64ObservableCounter with no validation issues",

Expand Down Expand Up @@ -670,6 +682,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) {

wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName),
},
{
name: "Float64Histogram with invalid buckets",

fn: func(t *testing.T, m metric.Meter) error {
i, err := m.Float64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5))
assert.NotNil(t, i)
return err
},

wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})),
},
{
name: "Float64ObservableCounter with no validation issues",

Expand Down Expand Up @@ -1970,3 +1993,63 @@ func TestMalformedSelectors(t *testing.T) {
})
}
}

func TestHistogramBucketPrecedenceOrdering(t *testing.T) {
defaultBuckets := []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}
aggregationSelector := func(InstrumentKind) Aggregation {
return AggregationExplicitBucketHistogram{Boundaries: []float64{0, 1, 2, 3, 4, 5}}
}
for _, tt := range []struct {
desc string
reader Reader
views []View
histogramOpts []metric.Float64HistogramOption
expectedBucketBoundaries []float64
}{
{
desc: "default",
reader: NewManualReader(),
expectedBucketBoundaries: defaultBuckets,
},
{
desc: "custom reader aggregation overrides default",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
expectedBucketBoundaries: []float64{0, 1, 2, 3, 4, 5},
},
{
desc: "overridden by histogram option",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
histogramOpts: []metric.Float64HistogramOption{
metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10),
},
expectedBucketBoundaries: []float64{0, 2, 4, 6, 8, 10},
},
{
desc: "overridden by view",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
histogramOpts: []metric.Float64HistogramOption{
metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10),
},
views: []View{NewView(Instrument{Name: "*"}, Stream{
Aggregation: AggregationExplicitBucketHistogram{Boundaries: []float64{0, 3, 6, 9, 12, 15}},
})},
expectedBucketBoundaries: []float64{0, 3, 6, 9, 12, 15},
},
} {
t.Run(tt.desc, func(t *testing.T) {
meter := NewMeterProvider(WithView(tt.views...), WithReader(tt.reader)).Meter("TestHistogramBucketPrecedenceOrdering")
sfHistogram, err := meter.Float64Histogram("sync.float64.histogram", tt.histogramOpts...)
require.NoError(t, err)
sfHistogram.Record(context.Background(), 1)
var rm metricdata.ResourceMetrics
err = tt.reader.Collect(context.Background(), &rm)
require.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
gotHist, ok := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64])
require.True(t, ok)
require.Len(t, gotHist.DataPoints, 1)
assert.Equal(t, tt.expectedBucketBoundaries, gotHist.DataPoints[0].Bounds)
})
}
}
81 changes: 56 additions & 25 deletions sdk/metric/pipeline.go
Expand Up @@ -231,7 +231,7 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *ins
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) {
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) {
var (
matched bool
measures []aggregate.Measure[N]
Expand All @@ -245,8 +245,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
continue
}
matched = true

in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
Expand All @@ -271,7 +270,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
Expand All @@ -291,6 +290,31 @@ type aggVal[N int64 | float64] struct {
Err error
}

// readerDefaultAggregation returns the default aggregation for the instrument
// kind based on the reader's aggregation preferences. This is used unless the
// aggregation is overridden with a view.
func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation {
aggregation := i.pipeline.reader.aggregation(kind)
switch aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
aggregation = aggregation.copy()
if err := aggregation.err(); err != nil {
orig := aggregation
aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", aggregation,
)
}

Check warning on line 313 in sdk/metric/pipeline.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/pipeline.go#L306-L313

Added lines #L306 - L313 were not covered by tests
}
return aggregation
}

// cachedAggregator returns the appropriate aggregate input and output
// functions for an instrument configuration. If the exact instrument has been
// created within the inst.Scope, those aggregate function instances will be
Expand All @@ -305,29 +329,14 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
switch stream.Aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
stream.Aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
stream.Aggregation = stream.Aggregation.copy()
if err := stream.Aggregation.err(); err != nil {
orig := stream.Aggregation
stream.Aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", stream.Aggregation,
)
}
}
// The aggregation was not overridden with a view. Use the aggregation
// provided by the reader.
stream.Aggregation = readerAggregation
case AggregationDefault:
// The view explicitly requested the default aggregation.

Check warning on line 339 in sdk/metric/pipeline.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/pipeline.go#L339

Added line #L339 was not covered by tests
stream.Aggregation = DefaultAggregationSelector(kind)
}

Expand Down Expand Up @@ -596,7 +605,29 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)

errs := &multierror{}
for _, i := range r.inserters {
in, err := i.Instrument(id)
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}

// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
// over boundaries provided by the reader.
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
}
Expand Down

0 comments on commit b6ed43a

Please sign in to comment.