Skip to content

Commit

Permalink
Metric SDK: Remove distiction between filtered and unfiltered
Browse files Browse the repository at this point in the history
attributes.
  • Loading branch information
dashpole committed Jul 13, 2023
1 parent 55fb2bb commit 385c4d4
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 294 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -29,6 +29,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)

### Fixed

Expand Down
19 changes: 0 additions & 19 deletions sdk/metric/internal/aggregate/aggregator.go
Expand Up @@ -38,22 +38,3 @@ type Aggregator[N int64 | float64] interface {
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}

// precomputeAggregator is an Aggregator that receives values to aggregate that
// have been pre-computed by the caller.
type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]

// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
//
// Pre-computed measurements of filtered attributes need to be recorded
// separate from those that haven't been filtered so they can be added to
// the non-filtered pre-computed measurements in a collection cycle and
// then resets after the cycle (the non-filtered pre-computed measurements
// are not reset).
aggregateFiltered(N, attribute.Set)
}
43 changes: 0 additions & 43 deletions sdk/metric/internal/aggregate/filter.go
Expand Up @@ -27,9 +27,6 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg
if fn == nil {
return agg
}
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}

Expand Down Expand Up @@ -68,43 +65,3 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]
}

// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
fAttr, _ := attr.Filter(f.filter)
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
89 changes: 0 additions & 89 deletions sdk/metric/internal/aggregate/filter_test.go
Expand Up @@ -15,8 +15,6 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"fmt"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -196,90 +194,3 @@ func TestFilterConcurrent(t *testing.T) {
testFilterConcurrent[float64](t)
})
}

func TestPrecomputedFilter(t *testing.T) {
t.Run("Int64", testPrecomputedFilter[int64]())
t.Run("Float64", testPrecomputedFilter[float64]())
}

func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
agg := newTestFilterAgg[N]()
f := NewFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f)

var (
powerLevel = attribute.Int("power-level", 9000)
user = attribute.String("user", "Alice")
admin = attribute.Bool("admin", true)
)
a := attribute.NewSet(powerLevel)
key := a
f.Aggregate(1, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(0), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user)
f.Aggregate(2, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(2), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user, admin)
f.Aggregate(3, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel)
f.Aggregate(2, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(user)
f.Aggregate(3, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a))

_ = f.Aggregation()
assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation")
}
}

func str(a attribute.Set) string {
iter := a.Iter()
out := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Attribute()
out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface()))
}
return strings.Join(out, ",")
}

type testFilterAgg[N int64 | float64] struct {
values map[attribute.Set]precomputedValue[N]
aggregationN int
}

func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] {
return &testFilterAgg[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
v := a.values[attr]
v.measured = val
a.values[attr] = v
}

// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
}

func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation {
a.aggregationN++
return nil
}
82 changes: 12 additions & 70 deletions sdk/metric/internal/aggregate/sum.go
Expand Up @@ -158,63 +158,6 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
return out
}

// precomputedValue is the recorded measurement value for a set of attributes.
type precomputedValue[N int64 | float64] struct {
// measured is the last value measured for a set of attributes that were
// not filtered.
measured N
// filtered is the sum of values from measurements that had their
// attributes filtered.
filtered N
}

// precomputedMap is the storage for precomputed sums.
type precomputedMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]precomputedValue[N]
}

func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
return &precomputedMap[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

// Aggregate records value with the unfiltered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value overwrite
// that value.
// - If that measurement's attributes were filtered, this value will be
// recorded along side that value.
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
v := s.values[attr]
v.measured = value
s.values[attr] = v
s.Unlock()
}

// aggregateFiltered records value with the filtered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value will be
// recorded along side that value.
// - If that measurement's attributes were filtered, this value will be
// added to it.
//
// This method should not be used if attr have not been reduced by an attribute
// filter.
func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
s.Lock()
v := s.values[attr]
v.filtered += value
s.values[attr] = v
s.Unlock()
}

// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
Expand All @@ -226,17 +169,17 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { //
// The output Aggregation will report recorded values as delta temporality.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedDeltaSum[N]{
precomputedMap: newPrecomputedMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
valueMap: newValueMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
}
}

// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all
// aggregation cycles as the delta of these sums.
type precomputedDeltaSum[N int64 | float64] struct {
*precomputedMap[N]
*valueMap[N]

reported map[attribute.Set]N

Expand Down Expand Up @@ -271,15 +214,14 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values {
v := value.measured + value.filtered
delta := v - s.reported[attr]
delta := value - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: delta,
})
newReported[attr] = v
newReported[attr] = value
// Unused attribute sets do not report.
delete(s.values, attr)
}
Expand All @@ -302,15 +244,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
// temporality.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](),
monotonic: monotonic,
start: now(),
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}

// precomputedCumulativeSum directly records and reports a set of pre-computed sums.
type precomputedCumulativeSum[N int64 | float64] struct {
*precomputedMap[N]
*valueMap[N]

monotonic bool
start time.Time
Expand Down Expand Up @@ -345,7 +287,7 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value.measured + value.filtered,
Value: value,
})
// Unused attribute sets do not report.
delete(s.values, attr)
Expand Down

0 comments on commit 385c4d4

Please sign in to comment.