Skip to content

Commit

Permalink
drop non-observed attribute sets
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jul 7, 2023
1 parent c404a30 commit 271d230
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -25,6 +25,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Count the Collect time in the PeriodicReader timeout. (#4221)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)

### Fixed

Expand Down
24 changes: 9 additions & 15 deletions sdk/metric/internal/aggregate/sum.go
Expand Up @@ -255,10 +255,12 @@ type precomputedDeltaSum[N int64 | float64] struct {
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
newReported := make(map[attribute.Set]N)
s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
s.reported = newReported
return nil
}

Expand All @@ -277,16 +279,12 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
Time: t,
Value: delta,
})
if delta != 0 {
s.reported[attr] = v
}
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
newReported[attr] = v
// Unused attribute sets do not report.
delete(s.values, attr)
}
// Unused attribute sets are forgotten.
s.reported = newReported
// The delta collection cycle resets.
s.start = t
return out
Expand Down Expand Up @@ -349,12 +347,8 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
Time: t,
Value: value.measured + value.filtered,
})
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
// Unused attribute sets do not report.
delete(s.values, attr)
}
return out
}
33 changes: 11 additions & 22 deletions sdk/metric/internal/aggregate/sum_test.go
Expand Up @@ -180,10 +180,9 @@ func TestPreComputedDeltaSum(t *testing.T) {
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Delta values should zero.
// No observation means no metric data
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
Expand All @@ -193,13 +192,8 @@ func TestPreComputedDeltaSum(t *testing.T) {

// Filtered values should not persist.
got = agg.Aggregation()
// measured(+): 1, previous(-): 2, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// No observation means no metric data
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Override set value.
agg.Aggregate(2, attrs)
Expand All @@ -208,8 +202,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 1, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
// measured(+): 5, previous(-): 8, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Filtered values should not persist.
Expand Down Expand Up @@ -251,19 +245,18 @@ func TestPreComputedCumulativeSum(t *testing.T) {
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Cumulative values should persist.
// Cumulative values should not persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Override set value.
agg.Aggregate(5, attrs)
Expand All @@ -276,8 +269,7 @@ func TestPreComputedCumulativeSum(t *testing.T) {

// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)

// Order should not affect measure.
// Filtered should add.
Expand All @@ -287,9 +279,6 @@ func TestPreComputedCumulativeSum(t *testing.T) {
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
}

func TestEmptySumNilAggregation(t *testing.T) {
Expand Down
6 changes: 2 additions & 4 deletions sdk/metric/meter_test.go
Expand Up @@ -1692,8 +1692,7 @@ func TestObservableExample(t *testing.T) {
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 60},
// Thread 1 is no longer exported.
{Attributes: thread2, Value: 53},
{Attributes: thread3, Value: 5},
},
Expand Down Expand Up @@ -1767,8 +1766,7 @@ func TestObservableExample(t *testing.T) {
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 0},
// Thread 1 is no longer exported.
{Attributes: thread2, Value: 6},
{Attributes: thread3, Value: 5},
},
Expand Down

0 comments on commit 271d230

Please sign in to comment.