Skip to content

Commit

Permalink
implement WithExplicitBucketBoundaries option in the metric SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Oct 11, 2023
1 parent 6980c9c commit 30aecd6
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 34 deletions.
40 changes: 36 additions & 4 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
// distribution of int64 measurements during a computational operation.
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
Expand Down Expand Up @@ -190,9 +189,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
// distribution of float64 measurements during a computational operation.
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
Expand Down Expand Up @@ -456,12 +454,29 @@ func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]ag
return p.int64Resolver.Aggregators(inst)
}

func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) {
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
return p.int64Resolver.HistogramAggregators(inst, cfg.ExplicitBucketBoundaries())
}

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }

Expand All @@ -476,12 +491,29 @@ func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]
return p.float64Resolver.Aggregators(inst)
}

func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) {
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
return p.float64Resolver.HistogramAggregators(inst, cfg.ExplicitBucketBoundaries())
}

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
Expand Down
99 changes: 74 additions & 25 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *ins
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) {
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) {
var (
matched bool
measures []aggregate.Measure[N]
Expand All @@ -245,8 +245,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
continue
}
matched = true

in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
Expand All @@ -271,7 +270,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
Expand All @@ -291,6 +290,31 @@ type aggVal[N int64 | float64] struct {
Err error
}

// readerDefaultAggregation returns the default aggregation for the instrument
// kind based on the reader's aggregation preferences. This is used unless the
// aggregation is overridden with a view.
func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation {
aggregation := i.pipeline.reader.aggregation(kind)
switch aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
aggregation = aggregation.copy()
if err := aggregation.err(); err != nil {
orig := aggregation
aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", aggregation,
)
}
}
return aggregation
}

// cachedAggregator returns the appropriate aggregate input and output
// functions for an instrument configuration. If the exact instrument has been
// created within the inst.Scope, those aggregate function instances will be
Expand All @@ -305,29 +329,14 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
switch stream.Aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
stream.Aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
stream.Aggregation = stream.Aggregation.copy()
if err := stream.Aggregation.err(); err != nil {
orig := stream.Aggregation
stream.Aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", stream.Aggregation,
)
}
}
// The aggregation was not overridden with a view. Use the aggregation
// provided by the reader.
stream.Aggregation = readerAggregation
case AggregationDefault:
// The view explicitly requested the default aggregation.
stream.Aggregation = DefaultAggregationSelector(kind)
}

Expand Down Expand Up @@ -596,7 +605,26 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)

errs := &multierror{}
for _, i := range r.inserters {
in, err := i.Instrument(id)
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}

// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
// over boundaries provided by the reader.
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
agg = withHistogramBucketBoundaries(agg, boundaries)
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
}
Expand All @@ -605,6 +633,27 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
return measures, errs.errorOrNil()
}

// withHistogramBucketBoundaries overrides the bucket boundaries on the aggregation, if applicable.
func withHistogramBucketBoundaries(aggregation Aggregation, boundaries []float64) Aggregation {
if len(boundaries) == 0 {
return aggregation
}
if agg, ok := aggregation.(AggregationExplicitBucketHistogram); ok {
orig := agg.Boundaries
agg.Boundaries = boundaries
if err := aggregation.err(); err != nil {
// revert back to the original boundaries
agg.Boundaries = orig
global.Error(
err, "ignoring histogram explicit boundaries on histogram Instrument",
"boundaries", boundaries,
"replacement", orig,
)
}
}
return aggregation
}

type multierror struct {
wrapped error
errors []string
Expand Down
6 changes: 4 additions & 2 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
i := newInserter[N](p, &c)
input, err := i.Instrument(tt.inst)
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
input, err := i.Instrument(tt.inst, readerAggregation)
var comps []aggregate.ComputeAggregation
for _, instSyncs := range p.aggregations {
for _, i := range instSyncs {
Expand All @@ -374,7 +375,8 @@ func testInvalidInstrumentShouldPanic[N int64 | float64]() {
Name: "foo",
Kind: InstrumentKind(255),
}
_, _ = i.Instrument(inst)
readerAggregation := i.readerDefaultAggregation(inst.Kind)
_, _ = i.Instrument(inst, readerAggregation)
}

func TestInvalidInstrumentShouldPanic(t *testing.T) {
Expand Down
8 changes: 5 additions & 3 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
readerAggregation := i.readerDefaultAggregation(inst.Kind)
got, err := i.Instrument(inst, readerAggregation)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")
for _, in := range got {
Expand Down Expand Up @@ -372,7 +373,8 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
pipe := newPipeline(nil, NewManualReader(), nil)
i := newInserter[int64](pipe, &vc)

_, origID, err := i.cachedAggregator(scope, kind, stream)
readerAggregation := i.readerDefaultAggregation(kind)
_, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
require.NoError(t, err)

require.Len(t, pipe.aggregations, 1)
Expand All @@ -382,7 +384,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
require.Equal(t, name, iSync[0].name)

stream.Name = "RequestCount"
_, id, err := i.cachedAggregator(scope, kind, stream)
_, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
require.NoError(t, err)
assert.Equal(t, origID, id, "multiple aggregators for equivalent name")

Expand Down

0 comments on commit 30aecd6

Please sign in to comment.