Skip to content

Commit

Permalink
Adjust Cardinality Limit to Accommodate Internal Reserves (#5382)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajkumar-rangaraj authored Mar 7, 2024
1 parent bae8a6b commit b90c3b2
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 68 deletions.
54 changes: 25 additions & 29 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class AggregatorStore
#endif
internal readonly bool OutputDelta;
internal readonly bool OutputDeltaWithUnusedMetricPointReclaimEnabled;
internal readonly int CardinalityLimit;
internal readonly int NumberOfMetricPoints;
internal readonly bool EmitOverflowAttribute;
internal readonly ConcurrentDictionary<Tags, LookupData>? TagsToMetricPointIndexDictionaryDelta;
internal readonly Func<ExemplarReservoir?>? ExemplarReservoirFactory;
Expand Down Expand Up @@ -71,11 +71,15 @@ internal AggregatorStore(
Func<ExemplarReservoir?>? exemplarReservoirFactory = null)
{
this.name = metricStreamIdentity.InstrumentName;
this.CardinalityLimit = cardinalityLimit;

this.metricPointCapHitMessage = $"Maximum MetricPoints limit reached for this Metric stream. Configured limit: {this.CardinalityLimit}";
this.metricPoints = new MetricPoint[cardinalityLimit];
this.currentMetricPointBatch = new int[cardinalityLimit];
// Increase the CardinalityLimit by 2 to reserve additional space.
// This adjustment accounts for overflow attribute and a case where zero tags are provided.
// Previously, these were included within the original cardinalityLimit, but now they are explicitly added to enhance clarity.
this.NumberOfMetricPoints = cardinalityLimit + 2;

this.metricPointCapHitMessage = $"Maximum MetricPoints limit reached for this Metric stream. Configured limit: {cardinalityLimit}";
this.metricPoints = new MetricPoint[this.NumberOfMetricPoints];
this.currentMetricPointBatch = new int[this.NumberOfMetricPoints];
this.aggType = aggType;
this.OutputDelta = temporality == AggregationTemporality.Delta;
this.histogramBounds = metricStreamIdentity.HistogramBucketBounds ?? FindDefaultHistogramBounds(in metricStreamIdentity);
Expand Down Expand Up @@ -110,31 +114,26 @@ internal AggregatorStore(
|| this.exemplarFilter == ExemplarFilterType.TraceBased,
"this.exemplarFilter had an unexpected value");

var reservedMetricPointsCount = 1;

if (emitOverflowAttribute)
{
// Setting metricPointIndex to 1 as we would reserve the metricPoints[1] for overflow attribute.
// Newer attributes should be added starting at the index: 2
this.metricPointIndex = 1;
reservedMetricPointsCount++;
}
// Setting metricPointIndex to 1 as we would reserve the metricPoints[1] for overflow attribute.
// Newer attributes should be added starting at the index: 2
this.metricPointIndex = 1;

this.OutputDeltaWithUnusedMetricPointReclaimEnabled = shouldReclaimUnusedMetricPoints && this.OutputDelta;

if (this.OutputDeltaWithUnusedMetricPointReclaimEnabled)
{
this.availableMetricPoints = new Queue<int>(cardinalityLimit - reservedMetricPointsCount);
this.availableMetricPoints = new Queue<int>(cardinalityLimit);

// There is no overload which only takes capacity as the parameter
// Using the DefaultConcurrencyLevel defined in the ConcurrentDictionary class: https://github.com/dotnet/runtime/blob/v7.0.5/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs#L2020
// We expect at the most (maxMetricPoints - reservedMetricPointsCount) * 2 entries- one for sorted and one for unsorted input
// We expect at the most (user provided cardinality limit) * 2 entries- one for sorted and one for unsorted input
this.TagsToMetricPointIndexDictionaryDelta =
new ConcurrentDictionary<Tags, LookupData>(concurrencyLevel: Environment.ProcessorCount, capacity: (cardinalityLimit - reservedMetricPointsCount) * 2);
new ConcurrentDictionary<Tags, LookupData>(concurrencyLevel: Environment.ProcessorCount, capacity: cardinalityLimit * 2);

// Add all the indices except for the reserved ones to the queue so that threads have
// readily available access to these MetricPoints for their use.
for (int i = reservedMetricPointsCount; i < this.CardinalityLimit; i++)
// Index 0 and 1 are reserved for no tags and overflow
for (int i = 2; i < this.NumberOfMetricPoints; i++)
{
this.availableMetricPoints.Enqueue(i);
}
Expand Down Expand Up @@ -199,12 +198,12 @@ internal int Snapshot()
}
else if (this.OutputDelta)
{
var indexSnapshot = Math.Min(this.metricPointIndex, this.CardinalityLimit - 1);
var indexSnapshot = Math.Min(this.metricPointIndex, this.NumberOfMetricPoints - 1);
this.SnapshotDelta(indexSnapshot);
}
else
{
var indexSnapshot = Math.Min(this.metricPointIndex, this.CardinalityLimit - 1);
var indexSnapshot = Math.Min(this.metricPointIndex, this.NumberOfMetricPoints - 1);
this.SnapshotCumulative(indexSnapshot);
}

Expand Down Expand Up @@ -260,12 +259,8 @@ internal void SnapshotDeltaWithMetricPointReclaim()
this.batchSize++;
}

int startIndexForReclaimableMetricPoints = 1;

if (this.EmitOverflowAttribute)
{
startIndexForReclaimableMetricPoints = 2; // Index 0 and 1 are reserved for no tags and overflow

// TakeSnapshot for the MetricPoint for overflow
ref var metricPointForOverflow = ref this.metricPoints[1];
if (metricPointForOverflow.MetricPointStatus != MetricPointStatus.NoCollectPending)
Expand All @@ -284,7 +279,8 @@ internal void SnapshotDeltaWithMetricPointReclaim()
}
}

for (int i = startIndexForReclaimableMetricPoints; i < this.CardinalityLimit; i++)
// Index 0 and 1 are reserved for no tags and overflow
for (int i = 2; i < this.NumberOfMetricPoints; i++)
{
ref var metricPoint = ref this.metricPoints[i];

Expand Down Expand Up @@ -473,7 +469,7 @@ private int LookupAggregatorStore(KeyValuePair<string, object?>[] tagKeysAndValu
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(sortedTags, out aggregatorIndex))
{
aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.CardinalityLimit)
if (aggregatorIndex >= this.NumberOfMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
Expand Down Expand Up @@ -502,7 +498,7 @@ private int LookupAggregatorStore(KeyValuePair<string, object?>[] tagKeysAndValu
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(sortedTags, out aggregatorIndex))
{
aggregatorIndex = ++this.metricPointIndex;
if (aggregatorIndex >= this.CardinalityLimit)
if (aggregatorIndex >= this.NumberOfMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
Expand All @@ -529,7 +525,7 @@ private int LookupAggregatorStore(KeyValuePair<string, object?>[] tagKeysAndValu
{
// This else block is for tag length = 1
aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.CardinalityLimit)
if (aggregatorIndex >= this.NumberOfMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
Expand All @@ -551,7 +547,7 @@ private int LookupAggregatorStore(KeyValuePair<string, object?>[] tagKeysAndValu
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(givenTags, out aggregatorIndex))
{
aggregatorIndex = ++this.metricPointIndex;
if (aggregatorIndex >= this.CardinalityLimit)
if (aggregatorIndex >= this.NumberOfMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
Expand Down
10 changes: 1 addition & 9 deletions src/OpenTelemetry/Metrics/MetricReaderExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,9 @@ internal void ApplyParentProviderSettings(
this.metrics = new Metric[metricLimit];
this.metricsCurrentBatch = new Metric[metricLimit];
this.cardinalityLimit = cardinalityLimit;
this.emitOverflowAttribute = emitOverflowAttribute;
this.reclaimUnusedMetricPoints = reclaimUnusedMetricPoints;
this.exemplarFilter = exemplarFilter;

if (emitOverflowAttribute)
{
// We need at least two metric points. One is reserved for zero tags and the other one for overflow attribute
if (cardinalityLimit > 1)
{
this.emitOverflowAttribute = true;
}
}
}

private bool TryGetExistingMetric(in MetricStreamIdentity metricStreamIdentity, [NotNullWhen(true)] out Metric? existingMetric)
Expand Down
8 changes: 6 additions & 2 deletions src/OpenTelemetry/Metrics/MetricStreamConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ public string[]? TagKeys
/// <para>Spec reference: <see
/// href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#cardinality-limits">Cardinality
/// limits</see>.</para>
/// Note: If not set the default MeterProvider cardinality limit of 2000
/// will apply.
/// Note: The cardinality limit determines the maximum number of unique
/// dimension combinations for metrics.
/// Metrics with zero dimensions and overflow metrics are treated specially
/// and do not count against this limit.
/// If not set the default
/// MeterProvider cardinality limit of 2000 will apply.
/// </remarks>
#if NET8_0_OR_GREATER
[Experimental(DiagnosticDefinitions.CardinalityLimitExperimentalApi, UrlFormat = DiagnosticDefinitions.ExperimentalApiUrlFormat)]
Expand Down
17 changes: 16 additions & 1 deletion test/OpenTelemetry.Tests/Metrics/MetricApiTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,22 @@ int MetricPointCount()

foreach (var metric in exportedItems)
{
foreach (ref readonly var metricPoint in metric.GetMetricPoints())
var enumerator = metric.GetMetricPoints().GetEnumerator();

// A case with zero tags and overflow attribute and are not a part of cardinality limit. Avoid counting them.
enumerator.MoveNext(); // First element reserved for zero tags.
enumerator.MoveNext(); // Second element reserved for overflow attribute.

// Validate second element is overflow attribute.
// Overflow attribute is behind experimental flag. So, it is not guaranteed to be present.
var tagEnumerator = enumerator.Current.Tags.GetEnumerator();
tagEnumerator.MoveNext();
if (!tagEnumerator.Current.Key.Contains("otel.metric.overflow"))
{
count++;
}

while (enumerator.MoveNext())
{
count++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public void TestEmitOverflowAttributeConfigWithOtherConfigProvider(string value,
}

[Theory]
[InlineData(1, false)]
[InlineData(2, true)]
[InlineData(10, true)]
public void EmitOverflowAttributeIsOnlySetWhenMaxMetricPointsIsGreaterThanOne(int maxMetricPoints, bool isEmitOverflowAttributeKeySet)
[InlineData(1)]
[InlineData(2)]
[InlineData(10)]
public void EmitOverflowAttributeIsNotDependentOnMaxMetricPoints(int maxMetricPoints)
{
var exportedItems = new List<Metric>();

Expand All @@ -129,7 +129,7 @@ public void EmitOverflowAttributeIsOnlySetWhenMaxMetricPointsIsGreaterThanOne(in
meterProvider.ForceFlush();

Assert.Single(exportedItems);
Assert.Equal(isEmitOverflowAttributeKeySet, exportedItems[0].AggregatorStore.EmitOverflowAttribute);
Assert.True(exportedItems[0].AggregatorStore.EmitOverflowAttribute);
}

[Theory]
Expand Down Expand Up @@ -158,7 +158,7 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem
counter.Add(10); // Record measurement for zero tags

// Max number for MetricPoints available for use when emitted with tags
int maxMetricPointsForUse = MeterProviderBuilderSdk.DefaultCardinalityLimit - 2;
int maxMetricPointsForUse = MeterProviderBuilderSdk.DefaultCardinalityLimit;

for (int i = 0; i < maxMetricPointsForUse; i++)
{
Expand Down Expand Up @@ -186,7 +186,7 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem
exportedItems.Clear();
metricPoints.Clear();

counter.Add(5, new KeyValuePair<string, object>("Key", 1998)); // Emit a metric to exceed the max MetricPoint limit
counter.Add(5, new KeyValuePair<string, object>("Key", 2000)); // Emit a metric to exceed the max MetricPoint limit

meterProvider.ForceFlush();
metric = exportedItems[0];
Expand Down Expand Up @@ -215,7 +215,7 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem
counter.Add(15); // Record another measurement for zero tags

// Emit 2500 more newer MetricPoints with distinct dimension combinations
for (int i = 2000; i < 4500; i++)
for (int i = 2001; i < 4501; i++)
{
counter.Add(5, new KeyValuePair<string, object>("Key", i));
}
Expand All @@ -236,11 +236,11 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem

int expectedSum;

// Number of metric points that were available before the 2500 measurements were made = 2000 (max MetricPoints) - 2 (reserved for zero tags and overflow) = 1998
// Number of metric points that were available before the 2500 measurements were made = 2000 (max MetricPoints)
if (this.shouldReclaimUnusedMetricPoints)
{
// If unused metric points are reclaimed, then number of metric points dropped = 2500 - 1998 = 502
expectedSum = 2510; // 502 * 5
// If unused metric points are reclaimed, then number of metric points dropped = 2500 - 2000 = 500
expectedSum = 2500; // 500 * 5
}
else
{
Expand Down Expand Up @@ -309,7 +309,7 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT
histogram.Record(10); // Record measurement for zero tags

// Max number for MetricPoints available for use when emitted with tags
int maxMetricPointsForUse = MeterProviderBuilderSdk.DefaultCardinalityLimit - 2;
int maxMetricPointsForUse = MeterProviderBuilderSdk.DefaultCardinalityLimit;

for (int i = 0; i < maxMetricPointsForUse; i++)
{
Expand Down Expand Up @@ -337,7 +337,7 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT
exportedItems.Clear();
metricPoints.Clear();

histogram.Record(5, new KeyValuePair<string, object>("Key", 1998)); // Emit a metric to exceed the max MetricPoint limit
histogram.Record(5, new KeyValuePair<string, object>("Key", 2000)); // Emit a metric to exceed the max MetricPoint limit

meterProvider.ForceFlush();
metric = exportedItems[0];
Expand Down Expand Up @@ -366,7 +366,7 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT
histogram.Record(15); // Record another measurement for zero tags

// Emit 2500 more newer MetricPoints with distinct dimension combinations
for (int i = 2000; i < 4500; i++)
for (int i = 2001; i < 4501; i++)
{
histogram.Record(5, new KeyValuePair<string, object>("Key", i));
}
Expand All @@ -388,12 +388,12 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT
int expectedCount;
int expectedSum;

// Number of metric points that were available before the 2500 measurements were made = 2000 (max MetricPoints) - 2 (reserved for zero tags and overflow) = 1998
// Number of metric points that were available before the 2500 measurements were made = 2000 (max MetricPoints)
if (this.shouldReclaimUnusedMetricPoints)
{
// If unused metric points are reclaimed, then number of metric points dropped = 2500 - 1998 = 502
expectedCount = 502;
expectedSum = 2510; // 502 * 5
// If unused metric points are reclaimed, then number of metric points dropped = 2500 - 2000 = 500
expectedCount = 500;
expectedSum = 2500; // 500 * 5
}
else
{
Expand All @@ -407,7 +407,6 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT
else
{
Assert.Equal(25, zeroTagsMetricPoint.GetHistogramSum());

Assert.Equal(2501, overflowMetricPoint.GetHistogramCount());
Assert.Equal(12505, overflowMetricPoint.GetHistogramSum()); // 5 + (2500 * 5)
}
Expand Down
16 changes: 8 additions & 8 deletions test/OpenTelemetry.Tests/Metrics/MetricViewTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,16 +963,16 @@ public void CardinalityLimitofMatchingViewTakesPrecedenceOverMeterProvider(bool

Assert.Equal(3, exportedItems.Count);

Assert.Equal(10000, exportedItems[1].AggregatorStore.CardinalityLimit);
Assert.Equal(10002, exportedItems[1].AggregatorStore.NumberOfMetricPoints);
if (setDefault)
{
Assert.Equal(3, exportedItems[0].AggregatorStore.CardinalityLimit);
Assert.Equal(3, exportedItems[2].AggregatorStore.CardinalityLimit);
Assert.Equal(5, exportedItems[0].AggregatorStore.NumberOfMetricPoints);
Assert.Equal(5, exportedItems[2].AggregatorStore.NumberOfMetricPoints);
}
else
{
Assert.Equal(2000, exportedItems[0].AggregatorStore.CardinalityLimit);
Assert.Equal(2000, exportedItems[2].AggregatorStore.CardinalityLimit);
Assert.Equal(2002, exportedItems[0].AggregatorStore.NumberOfMetricPoints);
Assert.Equal(2002, exportedItems[2].AggregatorStore.NumberOfMetricPoints);
}
}

Expand Down Expand Up @@ -1015,15 +1015,15 @@ public void ViewConflict_TwoDistinctInstruments_ThreeStreams()
var metricB = exportedItems[1];
var metricC = exportedItems[2];

Assert.Equal(256, metricA.AggregatorStore.CardinalityLimit);
Assert.Equal(258, metricA.AggregatorStore.NumberOfMetricPoints);
Assert.Equal("MetricStreamA", metricA.Name);
Assert.Equal(20, GetAggregatedValue(metricA));

Assert.Equal(3, metricB.AggregatorStore.CardinalityLimit);
Assert.Equal(5, metricB.AggregatorStore.NumberOfMetricPoints);
Assert.Equal("MetricStreamB", metricB.Name);
Assert.Equal(10, GetAggregatedValue(metricB));

Assert.Equal(200000, metricC.AggregatorStore.CardinalityLimit);
Assert.Equal(200002, metricC.AggregatorStore.NumberOfMetricPoints);
Assert.Equal("MetricStreamC", metricC.Name);
Assert.Equal(10, GetAggregatedValue(metricC));

Expand Down

0 comments on commit b90c3b2

Please sign in to comment.