Skip to content

Commit

Permalink
Metric SDK: Do not export non-observed attribute sets for async instr…
Browse files Browse the repository at this point in the history
…uments (#4290)

* drop non-observed attribute sets

* fix test comment

* add documentation for async callbacks dropping unobserved attributes
  • Loading branch information
dashpole committed Jul 11, 2023
1 parent 8b7bffc commit de26aaa
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field.
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)

### Fixed

Expand Down
24 changes: 9 additions & 15 deletions sdk/metric/internal/aggregate/sum.go
Expand Up @@ -255,10 +255,12 @@ type precomputedDeltaSum[N int64 | float64] struct {
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
newReported := make(map[attribute.Set]N)
s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
s.reported = newReported
return nil
}

Expand All @@ -277,16 +279,12 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
Time: t,
Value: delta,
})
if delta != 0 {
s.reported[attr] = v
}
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
newReported[attr] = v
// Unused attribute sets do not report.
delete(s.values, attr)
}
// Unused attribute sets are forgotten.
s.reported = newReported
// The delta collection cycle resets.
s.start = t
return out
Expand Down Expand Up @@ -349,12 +347,8 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
Time: t,
Value: value.measured + value.filtered,
})
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
// Unused attribute sets do not report.
delete(s.values, attr)
}
return out
}
33 changes: 11 additions & 22 deletions sdk/metric/internal/aggregate/sum_test.go
Expand Up @@ -180,10 +180,9 @@ func TestPreComputedDeltaSum(t *testing.T) {
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Delta values should zero.
// No observation means no metric data
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
Expand All @@ -193,13 +192,8 @@ func TestPreComputedDeltaSum(t *testing.T) {

// Filtered values should not persist.
got = agg.Aggregation()
// measured(+): 1, previous(-): 2, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// No observation means no metric data
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Override set value.
agg.Aggregate(2, attrs)
Expand All @@ -208,8 +202,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 1, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
// measured(+): 5, previous(-): 0, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Filtered values should not persist.
Expand Down Expand Up @@ -251,19 +245,18 @@ func TestPreComputedCumulativeSum(t *testing.T) {
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Cumulative values should persist.
// Cumulative values should not persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Override set value.
agg.Aggregate(5, attrs)
Expand All @@ -276,8 +269,7 @@ func TestPreComputedCumulativeSum(t *testing.T) {

// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Order should not affect measure.
// Filtered should add.
Expand All @@ -287,9 +279,6 @@ func TestPreComputedCumulativeSum(t *testing.T) {
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
}

func TestEmptySumNilAggregation(t *testing.T) {
Expand Down
14 changes: 12 additions & 2 deletions sdk/metric/meter.go
Expand Up @@ -107,6 +107,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
Expand All @@ -121,7 +122,8 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser

// Int64ObservableUpDownCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// int64 measurements once per a measurement collection cycle.
// int64 measurements once per a measurement collection cycle. Only the
// measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
Expand All @@ -137,6 +139,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
// Int64ObservableGauge returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// instantaneous int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
Expand Down Expand Up @@ -194,6 +197,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
Expand All @@ -208,7 +212,8 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O

// Float64ObservableUpDownCounter returns a new instrument identified by name
// and configured with options. The instrument is used to asynchronously record
// float64 measurements once per a measurement collection cycle.
// float64 measurements once per a measurement collection cycle. Only the
// measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
Expand All @@ -224,6 +229,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
// Float64ObservableGauge returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// instantaneous float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
Expand Down Expand Up @@ -272,6 +278,10 @@ func isAlphanumeric(c rune) bool {
// Only instruments from this meter can be registered with f, an error is
// returned if other instrument are provided.
//
// Only observations made in the callback will be exported. Unlike synchronous
// instruments, asynchronous callbacks can "forget" attribute sets that are no
// longer relevant by omitting the observation during the callback.
//
// The returned Registration can be used to unregister f.
func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
if len(insts) == 0 {
Expand Down
6 changes: 2 additions & 4 deletions sdk/metric/meter_test.go
Expand Up @@ -1687,8 +1687,7 @@ func TestObservableExample(t *testing.T) {
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 60},
// Thread 1 is no longer exported.
{Attributes: thread2, Value: 53},
{Attributes: thread3, Value: 5},
},
Expand Down Expand Up @@ -1762,8 +1761,7 @@ func TestObservableExample(t *testing.T) {
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 0},
// Thread 1 is no longer exported.
{Attributes: thread2, Value: 6},
{Attributes: thread3, Value: 5},
},
Expand Down

0 comments on commit de26aaa

Please sign in to comment.