From 3bc9dccb67613b62e482dced7bc0761ade59eb2e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 18 Aug 2023 16:37:40 -0700 Subject: [PATCH 01/18] Add agg limiting func --- sdk/metric/internal/aggregate/limit.go | 37 ++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 sdk/metric/internal/aggregate/limit.go diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go new file mode 100644 index 00000000000..52023469180 --- /dev/null +++ b/sdk/metric/internal/aggregate/limit.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import "go.opentelemetry.io/otel/attribute" + +// overflowSet is the attribute set used to record a measurement when adding +// another distinct attribute set to the aggregate would exceed the aggregate +// limit. +var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true)) + +// limtAttr checks if adding a measurement for a will exceed the limit of the +// already measured values in m. If it will, overflowSet is returned. +// Otherwise, if it will not exceed the limit, or the limit is not set (limit +// <= 0), a is returned. +func limitAttr[V any](a attribute.Set, m map[attribute.Set]V, limit int) attribute.Set { + if limit > 0 { + _, exists := m[a] + if !exists && len(m) >= limit-1 { + return overflowSet + } + } + + return a +} From c8ce214d481279f834c11b3a3ddf4665579fbae0 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 18 Aug 2023 16:42:37 -0700 Subject: [PATCH 02/18] Add unit test for limitAttr --- sdk/metric/internal/aggregate/limit_test.go | 47 +++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 sdk/metric/internal/aggregate/limit_test.go diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go new file mode 100644 index 00000000000..cfe2749eafb --- /dev/null +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" +) + +func TestLimitAttr(t *testing.T) { + m := map[attribute.Set]struct{}{alice: {}} + + t.Run("NoLimit", func(t *testing.T) { + assert.Equal(t, alice, limitAttr(alice, m, 0)) + assert.Equal(t, bob, limitAttr(bob, m, 0)) + }) + + t.Run("NotAtLimit/Exists", func(t *testing.T) { + assert.Equal(t, alice, limitAttr(alice, m, 3)) + }) + + t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { + assert.Equal(t, bob, limitAttr(bob, m, 3)) + }) + + t.Run("AtLimit/Exists", func(t *testing.T) { + assert.Equal(t, alice, limitAttr(alice, m, 2)) + }) + + t.Run("AtLimit/DoesNotExist", func(t *testing.T) { + assert.Equal(t, overflowSet, limitAttr(bob, m, 2)) + }) +} From c79f14ee9ab368c190cde1ccf0a218ed13aafe69 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 18 Aug 2023 16:42:54 -0700 Subject: [PATCH 03/18] Add limiting to aggregate types --- sdk/metric/internal/aggregate/aggregate.go | 12 +++++++----- .../internal/aggregate/exponential_histogram.go | 5 ++++- .../aggregate/exponential_histogram_test.go | 4 ++-- sdk/metric/internal/aggregate/histogram.go | 9 ++++++--- sdk/metric/internal/aggregate/histogram_test.go | 6 +++--- sdk/metric/internal/aggregate/lastvalue.go | 4 +++- sdk/metric/internal/aggregate/sum.go | 17 +++++++++++------ 7 files changed, 36 insertions(+), 21 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 8dec14237b9..ba607b6376a 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -44,6 +44,8 @@ type Builder[N int64 | float64] struct { // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter + // TODO: doc + AggregationLimit int } func (b Builder[N]) filter(f Measure[N]) Measure[N] { @@ -63,7 +65,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] { func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // Delta temporality is the only temporality that makes semantic sense for // a last-value aggregate. - lv := newLastValue[N]() + lv := newLastValue[N](b.AggregationLimit) return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { // Ignore if dest is not a metricdata.Gauge. The chance for memory @@ -79,7 +81,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // PrecomputedSum returns a sum aggregate function input and output. The // arguments passed to the input are expected to be the precomputed sum values. func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newPrecomputedSum[N](monotonic) + s := newPrecomputedSum[N](monotonic, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -90,7 +92,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newSum[N](monotonic) + s := newSum[N](monotonic, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -102,7 +104,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum) + h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta @@ -114,7 +116,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu // ExponentialBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum) + h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 368f0027ec3..006e03c5ddc 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -288,13 +288,14 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), maxScale: int(maxScale), + limit: limit, values: make(map[attribute.Set]*expoHistogramDataPoint[N]), start: now(), @@ -309,6 +310,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int + limit int values map[attribute.Set]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -324,6 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se e.valuesMu.Lock() defer e.valuesMu.Unlock() + attr = limitAttr(attr, e.values, e.limit) v, ok := e.values[attr] if !ok { v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index cac734c9312..5d7e534ca56 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -188,7 +188,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[int64](4, 20, false, false) + h := newExponentialHistogram[int64](4, 20, false, false, 0) for _, v := range tt.values { h.measure(context.Background(), v, alice) } @@ -230,7 +230,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[float64](4, 20, false, false) + h := newExponentialHistogram[float64](4, 20, false, false, 0) for _, v := range tt.values { h.measure(context.Background(), v, alice) } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 62ec51e1f5e..6b4a2a71c41 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -54,11 +54,12 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 + limit int values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -69,6 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[ return &histValues[N]{ noSum: noSum, bounds: b, + limit: limit, values: make(map[attribute.Set]*buckets[N]), } } @@ -86,6 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) s.valuesMu.Lock() defer s.valuesMu.Unlock() + attr = limitAttr(attr, s.values, s.limit) b, ok := s.values[attr] if !ok { // N+1 buckets. For example: @@ -108,9 +111,9 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] { return &histogram[N]{ - histValues: newHistValues[N](boundaries, noSum), + histValues: newHistValues[N](boundaries, noSum, limit), noMinMax: noMinMax, start: now(), } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index ab44607e5f6..f0075615a04 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -273,7 +273,7 @@ func TestHistogramImmutableBounds(t *testing.T) { cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false) + h := newHistogram[int64](b, false, false, 0) require.Equal(t, cpB, h.bounds) b[0] = 10 @@ -289,7 +289,7 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false) + h := newHistogram[int64](bounds, noMinMax, false, 0) h.measure(context.Background(), 5, alice) var data metricdata.Aggregation = metricdata.Histogram[int64]{} @@ -307,7 +307,7 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - h := newHistogram[int64](bounds, noMinMax, false) + h := newHistogram[int64](bounds, noMinMax, false, 0) var data metricdata.Aggregation = metricdata.Histogram[int64]{} require.Equal(t, 0, h.delta(&data)) diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 6af2d606141..409d4e956a8 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -29,7 +29,7 @@ type datapoint[N int64 | float64] struct { value N } -func newLastValue[N int64 | float64]() *lastValue[N] { +func newLastValue[N int64 | float64](limit int) *lastValue[N] { return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} } @@ -37,12 +37,14 @@ func newLastValue[N int64 | float64]() *lastValue[N] { type lastValue[N int64 | float64] struct { sync.Mutex + limit int values map[attribute.Set]datapoint[N] } func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { d := datapoint[N]{timestamp: now(), value: value} s.Lock() + attr = limitAttr(attr, s.values, s.limit) s.values[attr] = d s.Unlock() } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 1e52ff0d1e5..fd81a38a286 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -26,15 +26,20 @@ import ( // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex + limit int values map[attribute.Set]N } -func newValueMap[N int64 | float64]() *valueMap[N] { - return &valueMap[N]{values: make(map[attribute.Set]N)} +func newValueMap[N int64 | float64](limit int) *valueMap[N] { + return &valueMap[N]{ + limit: limit, + values: make(map[attribute.Set]N), + } } func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { s.Lock() + attr = limitAttr(attr, s.values, s.limit) s.values[attr] += value s.Unlock() } @@ -42,9 +47,9 @@ func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int) *sum[N] { return &sum[N]{ - valueMap: newValueMap[N](), + valueMap: newValueMap[N](limit), monotonic: monotonic, start: now(), } @@ -129,9 +134,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int) *precomputedSum[N] { return &precomputedSum[N]{ - valueMap: newValueMap[N](), + valueMap: newValueMap[N](limit), monotonic: monotonic, start: now(), } From 6aa1640bfe83429cd0eaaadd1bfaeb3f927d3985 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 18 Aug 2023 16:48:35 -0700 Subject: [PATCH 04/18] Add internal x pkg for experimental feature-flagging --- sdk/metric/internal/x/x.go | 59 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 sdk/metric/internal/x/x.go diff --git a/sdk/metric/internal/x/x.go b/sdk/metric/internal/x/x.go new file mode 100644 index 00000000000..fea3a5805de --- /dev/null +++ b/sdk/metric/internal/x/x.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package x contains support for OTel metric SDK experimental features. +package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x" + +import ( + "os" + "strings" +) + +const EnvKeyRoot = "OTEL_GO_X_" + +var ( + CardinalityLimit = Feature{ + EnvKeySuffix: "CARDINALITY_LIMIT", + // TODO: support accepting number values here to set the cardinality + // limit. + EnablementVals: []string{"true"}, + } +) + +type Feature struct { + // EnvKeySuffix is the environment variable key suffix the xFeature is + // stored at. It is assumed EnvKeyRoot is the base of the environment + // variable key. + EnvKeySuffix string + // EnablementVals are the case-insensitive comparison values that indicate + // the Feature is enabled. + EnablementVals []string +} + +// Enabled returns if the Feature is enabled. +func Enabled(f Feature) bool { + key := EnvKeyRoot + f.EnvKeySuffix + vRaw, present := os.LookupEnv(key) + if !present { + return false + } + + v := strings.ToLower(vRaw) + for _, allowed := range f.EnablementVals { + if v == strings.ToLower(allowed) { + return true + } + } + return false +} From cc11cb0070249f8835331002f1a673669d990daf Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 18 Aug 2023 16:51:51 -0700 Subject: [PATCH 05/18] Connect cardinality limit to metric SDK --- sdk/metric/limit.go | 28 ++++++++++++++++++++++++++++ sdk/metric/pipeline.go | 3 ++- 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 sdk/metric/limit.go diff --git a/sdk/metric/limit.go b/sdk/metric/limit.go new file mode 100644 index 00000000000..589f80c4077 --- /dev/null +++ b/sdk/metric/limit.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import "go.opentelemetry.io/otel/sdk/metric/internal/x" + +func cardinalityLimit() int { + if !x.Enabled(x.CardinalityLimit) { + return 0 + } + + // TODO: make this configurable. + + // Default 2000. + return 2000 +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index d76231cff7f..132da653b75 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -349,7 +349,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum normID := id.normalize() cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ - Temporality: i.pipeline.reader.temporality(kind), + Temporality: i.pipeline.reader.temporality(kind), + AggregationLimit: cardinalityLimit(), } if len(stream.AllowAttributeKeys) > 0 { b.Filter = stream.attributeFilter() From 62311eb75027b78f2b2e879857a54598c194d273 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 11:56:36 -0800 Subject: [PATCH 06/18] Replace limitAttr fn with limiter type The Attribute method is still inlinable. --- .../aggregate/exponential_histogram.go | 6 +-- sdk/metric/internal/aggregate/histogram.go | 6 +-- sdk/metric/internal/aggregate/lastvalue.go | 9 ++-- sdk/metric/internal/aggregate/limit.go | 29 ++++++---- sdk/metric/internal/aggregate/limit_test.go | 54 +++++++++++-------- sdk/metric/internal/aggregate/sum.go | 6 +-- 6 files changed, 67 insertions(+), 43 deletions(-) diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 91099b32d89..e9c25980aa2 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -295,7 +295,7 @@ func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMa maxSize: int(maxSize), maxScale: int(maxScale), - limit: limit, + limit: newLimiter[*expoHistogramDataPoint[N]](limit), values: make(map[attribute.Set]*expoHistogramDataPoint[N]), start: now(), @@ -310,7 +310,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int - limit int + limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Set]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -326,7 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se e.valuesMu.Lock() defer e.valuesMu.Unlock() - attr = limitAttr(attr, e.values, e.limit) + attr = e.limit.Attributes(attr, e.values) v, ok := e.values[attr] if !ok { v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 6b4a2a71c41..5d886360bae 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -54,7 +54,7 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - limit int + limit limiter[*buckets[N]] values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } @@ -70,7 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) * return &histValues[N]{ noSum: noSum, bounds: b, - limit: limit, + limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Set]*buckets[N]), } } @@ -88,7 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) s.valuesMu.Lock() defer s.valuesMu.Unlock() - attr = limitAttr(attr, s.values, s.limit) + attr = s.limit.Attributes(attr, s.values) b, ok := s.values[attr] if !ok { // N+1 buckets. For example: diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 409d4e956a8..b79e80a0c8d 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -30,21 +30,24 @@ type datapoint[N int64 | float64] struct { } func newLastValue[N int64 | float64](limit int) *lastValue[N] { - return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} + return &lastValue[N]{ + limit: newLimiter[datapoint[N]](limit), + values: make(map[attribute.Set]datapoint[N]), + } } // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { sync.Mutex - limit int + limit limiter[datapoint[N]] values map[attribute.Set]datapoint[N] } func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { d := datapoint[N]{timestamp: now(), value: value} s.Lock() - attr = limitAttr(attr, s.values, s.limit) + attr = s.limit.Attributes(attr, s.values) s.values[attr] = d s.Unlock() } diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go index 52023469180..1894c38351b 100644 --- a/sdk/metric/internal/aggregate/limit.go +++ b/sdk/metric/internal/aggregate/limit.go @@ -21,17 +21,28 @@ import "go.opentelemetry.io/otel/attribute" // limit. var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true)) -// limtAttr checks if adding a measurement for a will exceed the limit of the -// already measured values in m. If it will, overflowSet is returned. -// Otherwise, if it will not exceed the limit, or the limit is not set (limit -// <= 0), a is returned. -func limitAttr[V any](a attribute.Set, m map[attribute.Set]V, limit int) attribute.Set { - if limit > 0 { - _, exists := m[a] - if !exists && len(m) >= limit-1 { +// limiter limits aggregate values. +type limiter[V any] struct { + // aggregation is the limit of unique attribute that can be aggregated. + aggregation int +} + +// newLimiter returns a new Limiter with the provided aggregation limit. +func newLimiter[V any](aggregation int) limiter[V] { + return limiter[V]{aggregation: aggregation} +} + +// Attributes checks if adding a measurement for attrs will exceed the +// aggregation cardinality limit for the existing measurements. If it will, +// overflowSet is returned. Otherwise, if it will not exceed the limit, or the +// limit is not set (limit <= 0), attr is returned. +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set { + if l.aggregation > 0 { + _, exists := measurements[attrs] + if !exists && len(measurements) >= l.aggregation-1 { return overflowSet } } - return a + return attrs } diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index cfe2749eafb..ba1982b4aa6 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -21,27 +21,37 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func TestLimitAttr(t *testing.T) { - m := map[attribute.Set]struct{}{alice: {}} - - t.Run("NoLimit", func(t *testing.T) { - assert.Equal(t, alice, limitAttr(alice, m, 0)) - assert.Equal(t, bob, limitAttr(bob, m, 0)) - }) - - t.Run("NotAtLimit/Exists", func(t *testing.T) { - assert.Equal(t, alice, limitAttr(alice, m, 3)) - }) - - t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { - assert.Equal(t, bob, limitAttr(bob, m, 3)) - }) - - t.Run("AtLimit/Exists", func(t *testing.T) { - assert.Equal(t, alice, limitAttr(alice, m, 2)) - }) +func TestLimiter(t *testing.T) { + t.Run("Attributes", testAttributes()) +} - t.Run("AtLimit/DoesNotExist", func(t *testing.T) { - assert.Equal(t, overflowSet, limitAttr(bob, m, 2)) - }) +func testAttributes() func(*testing.T) { + m := map[attribute.Set]struct{}{alice: {}} + return func(t *testing.T) { + t.Run("NoLimit", func(t *testing.T) { + l := newLimiter[struct{}](0) + assert.Equal(t, alice, l.Attributes(alice, m)) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("NotAtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("AtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("AtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, overflowSet, l.Attributes(bob, m)) + }) + } } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index fd81a38a286..a0d26e1ddb9 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -26,20 +26,20 @@ import ( // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - limit int + limit limiter[N] values map[attribute.Set]N } func newValueMap[N int64 | float64](limit int) *valueMap[N] { return &valueMap[N]{ - limit: limit, + limit: newLimiter[N](limit), values: make(map[attribute.Set]N), } } func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { s.Lock() - attr = limitAttr(attr, s.values, s.limit) + attr = s.limit.Attributes(attr, s.values) s.values[attr] += value s.Unlock() } From b53c71c070afacc4ea6d6e5d720bf188914fafa7 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 12:05:54 -0800 Subject: [PATCH 07/18] Use x.CardinalityLimit directly --- sdk/metric/internal/x/x.go | 3 +++ sdk/metric/limit.go | 26 -------------------------- sdk/metric/pipeline.go | 10 ++++++++-- 3 files changed, 11 insertions(+), 28 deletions(-) delete mode 100644 sdk/metric/limit.go diff --git a/sdk/metric/internal/x/x.go b/sdk/metric/internal/x/x.go index 2891395725d..541160f9423 100644 --- a/sdk/metric/internal/x/x.go +++ b/sdk/metric/internal/x/x.go @@ -43,6 +43,9 @@ var ( // // To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment // variable to the integer limit value you want to use. + // + // Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0 + // will disable the cardinality limits. CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) { n, err := strconv.Atoi(v) if err != nil { diff --git a/sdk/metric/limit.go b/sdk/metric/limit.go deleted file mode 100644 index 0fb5be74aba..00000000000 --- a/sdk/metric/limit.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric // import "go.opentelemetry.io/otel/sdk/metric" - -import "go.opentelemetry.io/otel/sdk/metric/internal/x" - -func cardinalityLimit() int { - if v, ok := x.CardinalityLimit.Lookup(); ok { - return v - } - - // Default 2000. - return 2000 -} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 4beb28512a9..71b3b7122d8 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + "go.opentelemetry.io/otel/sdk/metric/internal/x" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -358,10 +359,15 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum normID := id.normalize() cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ - Temporality: i.pipeline.reader.temporality(kind), - AggregationLimit: cardinalityLimit(), + Temporality: i.pipeline.reader.temporality(kind), } b.Filter = stream.AttributeFilter + // A value less than or equal to zero will disable the aggregation + // limits for the builder (an all the created aggregates). + // CardinalityLimit.Lookup returns 0 by default if unset (or + // unrecognized input). Use that value directly. + b.AggregationLimit, _ = x.CardinalityLimit.Lookup() + in, out, err := i.aggregateFunc(b, stream.Aggregation, kind) if err != nil { return aggVal[N]{0, nil, err} From 92efe658fc2b6872ded6373ba0664c5d53028cd7 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 12:11:09 -0800 Subject: [PATCH 08/18] Simplify limiter test --- sdk/metric/internal/aggregate/limit_test.go | 58 +++++++++------------ 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index ba1982b4aa6..590f91da88e 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -21,37 +21,31 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func TestLimiter(t *testing.T) { - t.Run("Attributes", testAttributes()) -} - -func testAttributes() func(*testing.T) { +func TestLimiterAttributes(t *testing.T) { m := map[attribute.Set]struct{}{alice: {}} - return func(t *testing.T) { - t.Run("NoLimit", func(t *testing.T) { - l := newLimiter[struct{}](0) - assert.Equal(t, alice, l.Attributes(alice, m)) - assert.Equal(t, bob, l.Attributes(bob, m)) - }) - - t.Run("NotAtLimit/Exists", func(t *testing.T) { - l := newLimiter[struct{}](3) - assert.Equal(t, alice, l.Attributes(alice, m)) - }) - - t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { - l := newLimiter[struct{}](3) - assert.Equal(t, bob, l.Attributes(bob, m)) - }) - - t.Run("AtLimit/Exists", func(t *testing.T) { - l := newLimiter[struct{}](2) - assert.Equal(t, alice, l.Attributes(alice, m)) - }) - - t.Run("AtLimit/DoesNotExist", func(t *testing.T) { - l := newLimiter[struct{}](2) - assert.Equal(t, overflowSet, l.Attributes(bob, m)) - }) - } + t.Run("NoLimit", func(t *testing.T) { + l := newLimiter[struct{}](0) + assert.Equal(t, alice, l.Attributes(alice, m)) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("NotAtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("AtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("AtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, overflowSet, l.Attributes(bob, m)) + }) } From 27df16c6b4dbcf49acbf01c7feeff868fa8ff844 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 12:31:38 -0800 Subject: [PATCH 09/18] Add limiter benchmark --- sdk/metric/internal/aggregate/limit_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index 590f91da88e..895e05e9b1a 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -49,3 +49,18 @@ func TestLimiterAttributes(t *testing.T) { assert.Equal(t, overflowSet, l.Attributes(bob, m)) }) } + +var limitedAttr attribute.Set + +func BenchmarkLimiterAttributes(b *testing.B) { + m := map[attribute.Set]struct{}{alice: {}} + l := newLimiter[struct{}](2) + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + limitedAttr = l.Attributes(alice, m) + limitedAttr = l.Attributes(bob, m) + } +} From c2d5246b3512ffd3653f7006ecf52a75390d1096 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 12:36:26 -0800 Subject: [PATCH 10/18] Document the AggregationLimit field --- sdk/metric/internal/aggregate/aggregate.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index ba607b6376a..c61f8513789 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -44,7 +44,13 @@ type Builder[N int64 | float64] struct { // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter - // TODO: doc + // AggregationLimit is the cardinality limit of measurement attributes. Any + // measurement for new attributes once the limit has been reached will be + // aggregated into a single aggregate for the "otel.metric.overflow" + // attribute. + // + // If AggregationLimit is less than or equal to zero there will not be an + // aggregation limit imposed (i.e. unlimited attribute sets). AggregationLimit int } From 426b29d20c73bf0b9965fcdc810fa524329ed07d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 13:15:50 -0800 Subject: [PATCH 11/18] Test sum limits --- .../internal/aggregate/aggregate_test.go | 4 + sdk/metric/internal/aggregate/sum_test.go | 162 +++++++++++++++++- 2 files changed, 158 insertions(+), 8 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 79031b40218..384ca51c8cf 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -31,11 +31,15 @@ var ( keyUser = "user" userAlice = attribute.String(keyUser, "Alice") userBob = attribute.String(keyUser, "Bob") + userCarol = attribute.String(keyUser, "Carol") + userDave = attribute.String(keyUser, "Dave") adminTrue = attribute.Bool("admin", true) adminFalse = attribute.Bool("admin", false) alice = attribute.NewSet(userAlice, adminTrue) bob = attribute.NewSet(userBob, adminFalse) + carol = attribute.NewSet(userCarol, adminFalse) + dave = attribute.NewSet(userDave, adminFalse) // Filtered. attrFltr = func(kv attribute.KeyValue) bool { diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 3ac675a096b..b169fcad455 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -40,8 +40,9 @@ func TestSum(t *testing.T) { func testDeltaSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -125,14 +126,51 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } func testCumulativeSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -204,14 +242,49 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 14, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -8, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -296,14 +369,51 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -388,6 +498,42 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } From 3a59317ad1bf257b881df06c1a405cf7b7eeeabd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 13:18:11 -0800 Subject: [PATCH 12/18] Test limit for last value --- .../internal/aggregate/lastvalue_test.go | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index c758eb370c7..479232ad435 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -29,7 +29,10 @@ func TestLastValue(t *testing.T) { } func testLastValue[N int64 | float64]() func(*testing.T) { - in, out := Builder[N]{Filter: attrFltr}.LastValue() + in, out := Builder[N]{ + Filter: attrFltr, + AggregationLimit: 3, + }.LastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ { @@ -87,6 +90,36 @@ func testLastValue[N int64 | float64]() func(*testing.T) { }, }, }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + Time: staticTime, + Value: 1, + }, + }, + }, + }, }, }) } From 0354e683e554f91d316f90804a3c8d4c3e86f54f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 13:21:13 -0800 Subject: [PATCH 13/18] Test histogram limit --- .../internal/aggregate/histogram_test.go | 48 +++++++++++++++++-- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index f0075615a04..e51e9fc8f29 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -53,8 +53,9 @@ type conf[N int64 | float64] struct { func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -114,13 +115,34 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 1, 1), + c.hPt(fltrBob, 1, 1), + c.hPt(overflowSet, 1, 2), + }, + }, + }, + }, }) } func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -182,6 +204,24 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 2, 4), + c.hPt(fltrBob, 10, 3), + c.hPt(overflowSet, 1, 2), + }, + }, + }, + }, }) } From f1f3be7b206a11650e81e47dd8740261518f8374 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 14:05:45 -0800 Subject: [PATCH 14/18] Refactor expo hist test to use existing fixtures The tests for the exponential histogram create their own testing fixtures. There is nothing these new fixtures do that cannot already be done with the existing testing fixtures used by all the other aggregate functions. Unify the exponential histogram testing to use the existing fixtures. --- .../aggregate/exponential_histogram_test.go | 249 +++++++++--------- 1 file changed, 119 insertions(+), 130 deletions(-) diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 8442881f4e0..37e7f41e620 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -23,10 +23,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) type noErrorHandler struct{ t *testing.T } @@ -739,161 +737,152 @@ func TestSubNormal(t *testing.T) { } func TestExponentialHistogramAggregation(t *testing.T) { - t.Run("Int64", testExponentialHistogramAggregation[int64]) - t.Run("Float64", testExponentialHistogramAggregation[float64]) -} + t.Cleanup(mockTime(now)) -func testExponentialHistogramAggregation[N int64 | float64](t *testing.T) { - const ( - maxSize = 4 - maxScale = 20 - noMinMax = false - noSum = false - ) + t.Run("Int64/Delta", testDeltaExpoHist[int64]()) + t.Run("Float64/Delta", testDeltaExpoHist[float64]()) + t.Run("Int64/Cumulative", testCumulativeExpoHist[int64]()) + t.Run("Float64/Cumulative", testCumulativeExpoHist[float64]()) +} - tests := []struct { - name string - build func() (Measure[N], ComputeAggregation) - input [][]N - want metricdata.ExponentialHistogram[N] - wantCount int - }{ +func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + }.ExponentialBucketHistogram(4, 20, false, false) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{}, + }, + }, + }, { - name: "Delta Single", - build: func() (Measure[N], ComputeAggregation) { - return Builder[N]{ + input: []arg[N]{ + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 2, alice}, + {ctx, 16, alice}, + {ctx, 1, alice}, + }, + expect: output{ + n: 1, + agg: metricdata.ExponentialHistogram[N]{ Temporality: metricdata.DeltaTemporality, - }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) - }, - input: [][]N{ - {4, 4, 4, 2, 16, 1}, - }, - want: metricdata.ExponentialHistogram[N]{ - Temporality: metricdata.DeltaTemporality, - DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ - { - Count: 6, - Min: metricdata.NewExtrema[N](1), - Max: metricdata.NewExtrema[N](16), - Sum: 31, - Scale: -1, - PositiveBucket: metricdata.ExponentialBucket{ - Offset: -1, - Counts: []uint64{1, 4, 1}, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, }, }, }, }, - wantCount: 1, }, { - name: "Cumulative Single", - build: func() (Measure[N], ComputeAggregation) { - return Builder[N]{ + // Delta sums are expected to reset. + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{}, + }, + }, + }, + }) +} + +func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + }.ExponentialBucketHistogram(4, 20, false, false) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.ExponentialHistogram[N]{ Temporality: metricdata.CumulativeTemporality, - }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) - }, - input: [][]N{ - {4, 4, 4, 2, 16, 1}, - }, - want: metricdata.ExponentialHistogram[N]{ - Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ - { - Count: 6, - Min: metricdata.NewExtrema[N](1), - Max: metricdata.NewExtrema[N](16), - Sum: 31, - Scale: -1, - PositiveBucket: metricdata.ExponentialBucket{ - Offset: -1, - Counts: []uint64{1, 4, 1}, - }, - }, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{}, }, }, - wantCount: 1, }, { - name: "Delta Multiple", - build: func() (Measure[N], ComputeAggregation) { - return Builder[N]{ - Temporality: metricdata.DeltaTemporality, - }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) - }, - input: [][]N{ - {2, 3, 8}, - {4, 4, 4, 2, 16, 1}, - }, - want: metricdata.ExponentialHistogram[N]{ - Temporality: metricdata.DeltaTemporality, - DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ - { - Count: 6, - Min: metricdata.NewExtrema[N](1), - Max: metricdata.NewExtrema[N](16), - Sum: 31, - Scale: -1, - PositiveBucket: metricdata.ExponentialBucket{ - Offset: -1, - Counts: []uint64{1, 4, 1}, + input: []arg[N]{ + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 2, alice}, + {ctx, 16, alice}, + {ctx, 1, alice}, + }, + expect: output{ + n: 1, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, }, }, }, }, - wantCount: 1, }, { - name: "Cumulative Multiple ", - build: func() (Measure[N], ComputeAggregation) { - return Builder[N]{ + input: []arg[N]{}, + expect: output{ + n: 1, + agg: metricdata.ExponentialHistogram[N]{ Temporality: metricdata.CumulativeTemporality, - }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) - }, - input: [][]N{ - {2, 3, 8}, - {4, 4, 4, 2, 16, 1}, - }, - want: metricdata.ExponentialHistogram[N]{ - Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ - { - Count: 9, - Min: metricdata.NewExtrema[N](1), - Max: metricdata.NewExtrema[N](16), - Sum: 44, - Scale: -1, - PositiveBucket: metricdata.ExponentialBucket{ - Offset: -1, - Counts: []uint64{1, 6, 2}, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, }, }, }, }, - wantCount: 1, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - restore := withHandler(t) - defer restore() - in, out := tt.build() - ctx := context.Background() - - var got metricdata.Aggregation - var count int - for _, n := range tt.input { - for _, v := range n { - in(ctx, v, *attribute.EmptySet()) - } - count = out(&got) - } - - metricdatatest.AssertAggregationsEqual(t, tt.want, got, metricdatatest.IgnoreTimestamp()) - assert.Equal(t, tt.wantCount, count) - }) - } + }) } func FuzzGetBin(f *testing.F) { From 4874fcb95838da07028b46ee27b44b788525e0fe Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 14:14:50 -0800 Subject: [PATCH 15/18] Test the ExponentialHistogram limit --- .../aggregate/exponential_histogram_test.go | 110 +++++++++++++++++- 1 file changed, 106 insertions(+), 4 deletions(-) diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 37e7f41e620..af097567144 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -747,8 +747,9 @@ func TestExponentialHistogramAggregation(t *testing.T) { func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 2, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -805,13 +806,67 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 2, alice}, + {ctx, 16, alice}, + {ctx, 1, alice}, + // These will exceed the cardinality limit. + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 2, carol}, + {ctx, 16, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 2, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + }, + }, }) } func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 2, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -882,6 +937,53 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + // These will exceed the cardinality limit. + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 2, carol}, + {ctx, 16, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 2, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + }, + }, }) } From 3fa01f178e70944a8ce28afab7a551445f966246 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 14:23:39 -0800 Subject: [PATCH 16/18] Fix lint --- sdk/metric/internal/aggregate/limit_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index 895e05e9b1a..cd524d33966 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" ) From ea4fe06effa49c596367bf924fec713196fe473e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Dec 2023 15:02:36 -0800 Subject: [PATCH 17/18] Add docs --- CHANGELOG.md | 2 ++ sdk/metric/EXPERIMENTAL.md | 50 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 sdk/metric/EXPERIMENTAL.md diff --git a/CHANGELOG.md b/CHANGELOG.md index a0abcf5fb1d..c33efeb3269 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/otel/semconv/v1.22.0` package. The package contains semantic conventions from the `v1.22.0` version of the OpenTelemetry Semantic Conventions. (#4735) - Add `WithResourceAsConstantLabels` option to apply resource attributes for every metric emitted by the Prometheus exporter. (#4733) +- Experimental cardinality limiting is added to the metric SDK. + See [metric documentation](./sdk/metric/EXPERIMENTAL.md#cardinality-limit) for more information about this feature and how to enable it. (#4457) ### Changed diff --git a/sdk/metric/EXPERIMENTAL.md b/sdk/metric/EXPERIMENTAL.md new file mode 100644 index 00000000000..d2a5a0470b6 --- /dev/null +++ b/sdk/metric/EXPERIMENTAL.md @@ -0,0 +1,50 @@ +# Experimental Features + +The metric SDK contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the OpenTelemetry Go metric SDK prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These feature may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Cardinality Limit](#cardinality-limit) + +### Cardinality Limit + +The cardinality limit is the hard limit on the number of metric streams that can be collected for a single instrument. + +This experimental feature can be enabled by setting the `OTEL_GO_X_CARDINALITY_LIMIT` environment value. +The value must be an integer value. +All other values are ignored. + +If the value set is less than or equal to `0`, no limit will be applied. + +#### Examples + +Set the cardinality limit to 2000. + +```console +export OTEL_GO_X_CARDINALITY_LIMIT=2000 +``` + +Set an infinite cardinality limit (functionally equivalent to disabling the feature). + +```console +export OTEL_GO_X_CARDINALITY_LIMIT=-1 +``` + +Disable the cardinality limit. + +```console +unset OTEL_GO_X_CARDINALITY_LIMIT +``` + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. From 5bc739e167cd926c4d22dd8df85ac8d0f8ddebb8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 12 Dec 2023 08:00:47 -0800 Subject: [PATCH 18/18] Rename aggregation field to aggLimit --- sdk/metric/internal/aggregate/limit.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go index 1894c38351b..d3de8427200 100644 --- a/sdk/metric/internal/aggregate/limit.go +++ b/sdk/metric/internal/aggregate/limit.go @@ -23,13 +23,18 @@ var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true)) // limiter limits aggregate values. type limiter[V any] struct { - // aggregation is the limit of unique attribute that can be aggregated. - aggregation int + // aggLimit is the maximum number of metric streams that can be aggregated. + // + // Any metric stream with attributes distinct from any set already + // aggregated once the aggLimit will be meet will instead be aggregated + // into an "overflow" metric stream. That stream will only contain the + // "otel.metric.overflow"=true attribute. + aggLimit int } // newLimiter returns a new Limiter with the provided aggregation limit. func newLimiter[V any](aggregation int) limiter[V] { - return limiter[V]{aggregation: aggregation} + return limiter[V]{aggLimit: aggregation} } // Attributes checks if adding a measurement for attrs will exceed the @@ -37,9 +42,9 @@ func newLimiter[V any](aggregation int) limiter[V] { // overflowSet is returned. Otherwise, if it will not exceed the limit, or the // limit is not set (limit <= 0), attr is returned. func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set { - if l.aggregation > 0 { + if l.aggLimit > 0 { _, exists := measurements[attrs] - if !exists && len(measurements) >= l.aggregation-1 { + if !exists && len(measurements) >= l.aggLimit-1 { return overflowSet } }