Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric SDK: Do not export non-observed attribute sets for async instruments #4290

Merged
merged 3 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field.
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)

### Fixed

Expand Down
24 changes: 9 additions & 15 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,12 @@ type precomputedDeltaSum[N int64 | float64] struct {
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
newReported := make(map[attribute.Set]N)
s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
s.reported = newReported
return nil
}

Expand All @@ -277,16 +279,12 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
Time: t,
Value: delta,
})
if delta != 0 {
s.reported[attr] = v
}
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
newReported[attr] = v
// Unused attribute sets do not report.
delete(s.values, attr)
}
// Unused attribute sets are forgotten.
s.reported = newReported
// The delta collection cycle resets.
s.start = t
return out
Expand Down Expand Up @@ -349,12 +347,8 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
Time: t,
Value: value.measured + value.filtered,
})
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
// Unused attribute sets do not report.
delete(s.values, attr)
}
return out
}
33 changes: 11 additions & 22 deletions sdk/metric/internal/aggregate/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,9 @@ func TestPreComputedDeltaSum(t *testing.T) {
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Delta values should zero.
// No observation means no metric data
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
Expand All @@ -193,13 +192,8 @@ func TestPreComputedDeltaSum(t *testing.T) {

// Filtered values should not persist.
got = agg.Aggregation()
// measured(+): 1, previous(-): 2, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// No observation means no metric data
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Override set value.
agg.Aggregate(2, attrs)
Expand All @@ -208,8 +202,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 1, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
// measured(+): 5, previous(-): 0, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Filtered values should not persist.
Expand Down Expand Up @@ -251,19 +245,18 @@ func TestPreComputedCumulativeSum(t *testing.T) {
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Cumulative values should persist.
// Cumulative values should not persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Override set value.
agg.Aggregate(5, attrs)
Expand All @@ -276,8 +269,7 @@ func TestPreComputedCumulativeSum(t *testing.T) {

// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Order should not affect measure.
// Filtered should add.
Expand All @@ -287,9 +279,6 @@ func TestPreComputedCumulativeSum(t *testing.T) {
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
}

func TestEmptySumNilAggregation(t *testing.T) {
Expand Down
14 changes: 12 additions & 2 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
Expand All @@ -121,7 +122,8 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser

// Int64ObservableUpDownCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// int64 measurements once per a measurement collection cycle.
// int64 measurements once per a measurement collection cycle. Only the
// measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
Expand All @@ -137,6 +139,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
// Int64ObservableGauge returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// instantaneous int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
Expand Down Expand Up @@ -194,6 +197,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
Expand All @@ -208,7 +212,8 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O

// Float64ObservableUpDownCounter returns a new instrument identified by name
// and configured with options. The instrument is used to asynchronously record
// float64 measurements once per a measurement collection cycle.
// float64 measurements once per a measurement collection cycle. Only the
// measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
Expand All @@ -224,6 +229,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
// Float64ObservableGauge returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// instantaneous float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
Expand Down Expand Up @@ -272,6 +278,10 @@ func isAlphanumeric(c rune) bool {
// Only instruments from this meter can be registered with f, an error is
// returned if other instrument are provided.
//
// Only observations made in the callback will be exported. Unlike synchronous
// instruments, asynchronous callbacks can "forget" attribute sets that are no
// longer relevant by omitting the observation during the callback.
//
// The returned Registration can be used to unregister f.
func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
if len(insts) == 0 {
Expand Down
6 changes: 2 additions & 4 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1687,8 +1687,7 @@ func TestObservableExample(t *testing.T) {
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 60},
// Thread 1 is no longer exported.
{Attributes: thread2, Value: 53},
{Attributes: thread3, Value: 5},
},
Expand Down Expand Up @@ -1762,8 +1761,7 @@ func TestObservableExample(t *testing.T) {
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 0},
// Thread 1 is no longer exported.
{Attributes: thread2, Value: 6},
{Attributes: thread3, Value: 5},
},
Expand Down