Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi-reader observable counter double-counting bug #4742

Merged
merged 3 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4719)
- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4719)
- Fix a bug where using multiple readers resulted in incorrect asynchronous counter values in `go.opentelemetry.io/otel/sdk/metric`. (#4742)

## [1.20.0/0.43.0] 2023-11-10

Expand Down
28 changes: 19 additions & 9 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ var (
_ metric.Float64ObservableGauge = float64Observable{}
)

func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable {
return float64Observable{
observable: newObservable(m, kind, name, desc, u, meas),
observable: newObservable[float64](m, kind, name, desc, u),
}
}

Expand All @@ -291,9 +291,9 @@ var (
_ metric.Int64ObservableGauge = int64Observable{}
)

func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable {
return int64Observable{
observable: newObservable(m, kind, name, desc, u, meas),
observable: newObservable[int64](m, kind, name, desc, u),
}
}

Expand All @@ -302,10 +302,10 @@ type observable[N int64 | float64] struct {
observablID[N]

meter *meter
measures []aggregate.Measure[N]
measures measures[N]
}

func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand All @@ -314,14 +314,24 @@ func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc,
unit: u,
scope: m.scope,
},
meter: m,
measures: meas,
meter: m,
}
}

// observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, s attribute.Set) {
for _, in := range o.measures {
o.measures.observe(val, s)
}

func (o *observable[N]) appendMeasures(meas []aggregate.Measure[N]) {
o.measures = append(o.measures, meas...)
}

type measures[N int64 | float64] []aggregate.Measure[N]

// observe records the val for the set of attrs.
func (m measures[N]) observe(val N, s attribute.Set) {
for _, in := range m {
in(context.Background(), val, s)
}
}
Expand Down
182 changes: 92 additions & 90 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,44 @@
return i, validateInstrumentName(name)
}

// int64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}

Check warning on line 117 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L116-L117

Added lines #L116 - L117 were not covered by tests
// Drop aggregation
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
}

// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.int64ObservableInstrument(id, cfg.Callbacks())
}

// Int64ObservableUpDownCounter returns a new instrument identified by name and
Expand All @@ -126,14 +150,14 @@
// measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableUpDownCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.int64ObservableInstrument(id, cfg.Callbacks())
}

// Int64ObservableGauge returns a new instrument identified by name and
Expand All @@ -142,14 +166,14 @@
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableGauge,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.int64ObservableInstrument(id, cfg.Callbacks())
}

// Float64Counter returns a new instrument identified by name and configured
Expand Down Expand Up @@ -196,20 +220,44 @@
return i, validateInstrumentName(name)
}

// float64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}

Check warning on line 233 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L232-L233

Added lines #L232 - L233 were not covered by tests
// Drop aggregation
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
}

// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.float64ObservableInstrument(id, cfg.Callbacks())
}

// Float64ObservableUpDownCounter returns a new instrument identified by name
Expand All @@ -218,14 +266,14 @@
// measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableUpDownCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.float64ObservableInstrument(id, cfg.Callbacks())
}

// Float64ObservableGauge returns a new instrument identified by name and
Expand All @@ -234,14 +282,14 @@
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableGauge,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.float64ObservableInstrument(id, cfg.Callbacks())
}

func validateInstrumentName(name string) error {
Expand Down Expand Up @@ -528,65 +576,19 @@
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator.
return
}

for _, cBack := range cBacks {
p.pipes.registerCallback(p.callback(inst, cBack))
}
}

func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error {
inst := int64Observer{int64Observable: i}
return func(ctx context.Context) error { return f(ctx, inst) }
}

type int64Observer struct {
embedded.Int64Observer
int64Observable
measures[int64]
}

func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
c := metric.NewObserveConfig(opts)
o.observe(val, c.Attributes())
}

type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator.
return
}

for _, cBack := range cBacks {
p.pipes.registerCallback(p.callback(inst, cBack))
}
}

func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error {
inst := float64Observer{float64Observable: i}
return func(ctx context.Context) error { return f(ctx, inst) }
}

type float64Observer struct {
embedded.Float64Observer
float64Observable
measures[float64]
}

func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) {
Expand Down
13 changes: 10 additions & 3 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,15 +1589,16 @@ func TestObservableExample(t *testing.T) {
)

selector := func(InstrumentKind) metricdata.Temporality { return temp }
reader := NewManualReader(WithTemporalitySelector(selector))
reader1 := NewManualReader(WithTemporalitySelector(selector))
reader2 := NewManualReader(WithTemporalitySelector(selector))

allowAll := attribute.NewDenyKeysFilter()
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: allowAll})

filter := attribute.NewDenyKeysFilter("tid")
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})

mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered))
mp := NewMeterProvider(WithReader(reader1), WithReader(reader2), WithView(noFiltered, filtered))
meter := mp.Meter(scopeName)

observations := make(map[attribute.Set]int64)
Expand Down Expand Up @@ -1644,7 +1645,13 @@ func TestObservableExample(t *testing.T) {
collect := func(t *testing.T) {
t.Helper()
got := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &got)
dashpole marked this conversation as resolved.
Show resolved Hide resolved
err := reader1.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())

got = metricdata.ResourceMetrics{}
err = reader2.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
Expand Down