Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cardinality limiting to the metric SDK as an experimental feature #4457

Merged
merged 23 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// TODO: doc
AggregationLimit int
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
Expand All @@ -63,7 +65,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N]()
lv := newLastValue[N](b.AggregationLimit)

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -79,7 +81,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic)
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -90,7 +92,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic)
s := newSum[N](monotonic, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -102,7 +104,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum)
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -114,7 +116,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
5 changes: 4 additions & 1 deletion sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,14 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),

limit: limit,
values: make(map[attribute.Set]*expoHistogramDataPoint[N]),

start: now(),
Expand All @@ -309,6 +310,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

limit int
values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

Expand All @@ -324,6 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se
e.valuesMu.Lock()
defer e.valuesMu.Unlock()

attr = limitAttr(attr, e.values, e.limit)
v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) {
restore := withHandler(t)
defer restore()

h := newExponentialHistogram[int64](4, 20, false, false)
h := newExponentialHistogram[int64](4, 20, false, false, 0)
for _, v := range tt.values {
h.measure(context.Background(), v, alice)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) {
restore := withHandler(t)
defer restore()

h := newExponentialHistogram[float64](4, 20, false, false)
h := newExponentialHistogram[float64](4, 20, false, false, 0)
for _, v := range tt.values {
h.measure(context.Background(), v, alice)
}
Expand Down
9 changes: 6 additions & 3 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

limit int
values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand All @@ -69,6 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[
return &histValues[N]{
noSum: noSum,
bounds: b,
limit: limit,
values: make(map[attribute.Set]*buckets[N]),
}
}
Expand All @@ -86,6 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
s.valuesMu.Lock()
defer s.valuesMu.Unlock()

attr = limitAttr(attr, s.values, s.limit)
b, ok := s.values[attr]
if !ok {
// N+1 buckets. For example:
Expand All @@ -108,9 +111,9 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum),
histValues: newHistValues[N](boundaries, noSum, limit),
noMinMax: noMinMax,
start: now(),
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
cpB := make([]float64, len(b))
copy(cpB, b)

h := newHistogram[int64](b, false, false)
h := newHistogram[int64](b, false, false, 0)
require.Equal(t, cpB, h.bounds)

b[0] = 10
Expand All @@ -289,7 +289,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
}

func TestCumulativeHistogramImutableCounts(t *testing.T) {
h := newHistogram[int64](bounds, noMinMax, false)
h := newHistogram[int64](bounds, noMinMax, false, 0)
h.measure(context.Background(), 5, alice)

var data metricdata.Aggregation = metricdata.Histogram[int64]{}
Expand All @@ -307,7 +307,7 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) {
func TestDeltaHistogramReset(t *testing.T) {
t.Cleanup(mockTime(now))

h := newHistogram[int64](bounds, noMinMax, false)
h := newHistogram[int64](bounds, noMinMax, false, 0)

var data metricdata.Aggregation = metricdata.Histogram[int64]{}
require.Equal(t, 0, h.delta(&data))
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@ type datapoint[N int64 | float64] struct {
value N
}

func newLastValue[N int64 | float64]() *lastValue[N] {
func newLastValue[N int64 | float64](limit int) *lastValue[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}

// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex

limit int
values map[attribute.Set]datapoint[N]
}

func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
d := datapoint[N]{timestamp: now(), value: value}
s.Lock()
attr = limitAttr(attr, s.values, s.limit)
s.values[attr] = d
s.Unlock()
}
Expand Down
37 changes: 37 additions & 0 deletions sdk/metric/internal/aggregate/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import "go.opentelemetry.io/otel/attribute"

// overflowSet is the attribute set used to record a measurement when adding
// another distinct attribute set to the aggregate would exceed the aggregate
// limit.
var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))

// limtAttr checks if adding a measurement for a will exceed the limit of the
// already measured values in m. If it will, overflowSet is returned.
// Otherwise, if it will not exceed the limit, or the limit is not set (limit
// <= 0), a is returned.
func limitAttr[V any](a attribute.Set, m map[attribute.Set]V, limit int) attribute.Set {
if limit > 0 {
_, exists := m[a]
if !exists && len(m) >= limit-1 {
return overflowSet
}
}

return a
}
47 changes: 47 additions & 0 deletions sdk/metric/internal/aggregate/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
)

func TestLimitAttr(t *testing.T) {
m := map[attribute.Set]struct{}{alice: {}}

t.Run("NoLimit", func(t *testing.T) {
assert.Equal(t, alice, limitAttr(alice, m, 0))
assert.Equal(t, bob, limitAttr(bob, m, 0))
})

t.Run("NotAtLimit/Exists", func(t *testing.T) {
assert.Equal(t, alice, limitAttr(alice, m, 3))
})

t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) {
assert.Equal(t, bob, limitAttr(bob, m, 3))
})

t.Run("AtLimit/Exists", func(t *testing.T) {
assert.Equal(t, alice, limitAttr(alice, m, 2))
})

t.Run("AtLimit/DoesNotExist", func(t *testing.T) {
assert.Equal(t, overflowSet, limitAttr(bob, m, 2))
})
}
17 changes: 11 additions & 6 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,30 @@ import (
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
limit int
values map[attribute.Set]N
}

func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)}
func newValueMap[N int64 | float64](limit int) *valueMap[N] {
return &valueMap[N]{
limit: limit,
values: make(map[attribute.Set]N),
}
}

func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
s.Lock()
attr = limitAttr(attr, s.values, s.limit)
s.values[attr] += value
s.Unlock()
}

// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](),
valueMap: newValueMap[N](limit),
monotonic: monotonic,
start: now(),
}
Expand Down Expand Up @@ -129,9 +134,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observatrions as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](),
valueMap: newValueMap[N](limit),
monotonic: monotonic,
start: now(),
}
Expand Down
59 changes: 59 additions & 0 deletions sdk/metric/internal/x/x.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package x contains support for OTel metric SDK experimental features.
package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x"

import (
"os"
"strings"
)

const EnvKeyRoot = "OTEL_GO_X_"

var (
CardinalityLimit = Feature{
EnvKeySuffix: "CARDINALITY_LIMIT",
// TODO: support accepting number values here to set the cardinality
// limit.
EnablementVals: []string{"true"},
}
)

type Feature struct {
// EnvKeySuffix is the environment variable key suffix the xFeature is
// stored at. It is assumed EnvKeyRoot is the base of the environment
// variable key.
EnvKeySuffix string
// EnablementVals are the case-insensitive comparison values that indicate
// the Feature is enabled.
EnablementVals []string
}

// Enabled returns if the Feature is enabled.
func Enabled(f Feature) bool {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
key := EnvKeyRoot + f.EnvKeySuffix
vRaw, present := os.LookupEnv(key)
if !present {
return false
}

v := strings.ToLower(vRaw)
for _, allowed := range f.EnablementVals {
if v == strings.ToLower(allowed) {
return true
}

Check warning on line 56 in sdk/metric/internal/x/x.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/internal/x/x.go#L52-L56

Added lines #L52 - L56 were not covered by tests
}
return false

Check warning on line 58 in sdk/metric/internal/x/x.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/internal/x/x.go#L58

Added line #L58 was not covered by tests
}