Skip to content

feat: Add and enabled Metric cardinality capping by default #2901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ the suppression flag in their dedicated thread, so that telemetry generated from
those threads will not be fed back into OTel. Similarly, `SimpleLogProcessor`
also modified to suppress telemetry before invoking exporters.

- **Feature**: Implemented and enabled cardinality capping for Metrics by
default.
- The default cardinality limit is 2000 and can be customized using Views.
- This feature was previously removed in version 0.28 due to the lack of
configurability but has now been reintroduced with the ability to configure
the limit.
- TODO/Placeholder: Add ability to configure cardinality limits via Instrument
advisory.

## 0.29.0

Released 2025-Mar-21
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ pub struct Stream {
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
/// attributes will be kept.
pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,

/// Cardinality limit for the stream.
pub cardinality_limit: Option<usize>,
}

#[cfg(feature = "spec_unstable_metrics_views")]
Expand Down Expand Up @@ -245,6 +248,12 @@ impl Stream {

self
}

/// Set the stream cardinality limit.
pub fn cardinality_limit(mut self, limit: usize) -> Self {
self.cardinality_limit = Some(limit);
self
}
}

/// The identifying properties of an instrument.
Expand Down
60 changes: 40 additions & 20 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ use super::{
precomputed_sum::PrecomputedSum, sum::Sum, Number,
};

pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;

/// Checks whether aggregator has hit cardinality limit for metric streams
pub(crate) fn is_under_cardinality_limit(_size: usize) -> bool {
true

// TODO: Implement this feature, after allowing the ability to customize the cardinality limit.
// size < STREAM_CARDINALITY_LIMIT
}

/// Receives measurements to be aggregated.
pub(crate) trait Measure<T>: Send + Sync + 'static {
fn call(&self, measurement: T, attrs: &[KeyValue]);
Expand Down Expand Up @@ -133,14 +123,22 @@ pub(crate) struct AggregateBuilder<T> {
/// measurements.
filter: AttributeSetFilter,

/// Cardinality limit for the metric stream
cardinality_limit: usize,

_marker: marker::PhantomData<T>,
}

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
pub(crate) fn new(
temporality: Temporality,
filter: Option<Filter>,
cardinality_limit: usize,
) -> Self {
AggregateBuilder {
temporality,
filter: AttributeSetFilter::new(filter),
cardinality_limit,
_marker: marker::PhantomData,
}
}
Expand All @@ -150,18 +148,31 @@ impl<T: Number> AggregateBuilder<T> {
LastValue::new(
overwrite_temporality.unwrap_or(self.temporality),
self.filter.clone(),
self.cardinality_limit,
)
.into()
}

/// Builds a precomputed sum aggregate function input and output.
pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into()
PrecomputedSum::new(
self.temporality,
self.filter.clone(),
monotonic,
self.cardinality_limit,
)
.into()
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
Sum::new(
self.temporality,
self.filter.clone(),
monotonic,
self.cardinality_limit,
)
.into()
}

/// Builds a histogram aggregate function input and output.
Expand All @@ -177,6 +188,7 @@ impl<T: Number> AggregateBuilder<T> {
boundaries,
record_min_max,
record_sum,
self.cardinality_limit,
)
.into()
}
Expand All @@ -196,6 +208,7 @@ impl<T: Number> AggregateBuilder<T> {
max_scale,
record_min_max,
record_sum,
self.cardinality_limit,
)
.into()
}
Expand All @@ -211,10 +224,13 @@ mod tests {

use super::*;

const CARDINALITY_LIMIT_DEFAULT: usize = 2000;

#[test]
fn last_value_aggregation() {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
AggregateBuilder::<u64>::new(Temporality::Cumulative, None, CARDINALITY_LIMIT_DEFAULT)
.last_value(None);
let mut a = MetricData::Gauge(Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand Down Expand Up @@ -244,7 +260,8 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
.precomputed_sum(true);
let mut a = MetricData::Sum(Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -290,7 +307,8 @@ mod tests {
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).sum(true);
AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
.sum(true);
let mut a = MetricData::Sum(Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -335,8 +353,9 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
.explicit_bucket_histogram(vec![1.0], true, true);
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = MetricData::Histogram(Histogram {
data_points: vec![HistogramDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
Expand Down Expand Up @@ -382,8 +401,9 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
.exponential_bucket_histogram(4, 20, true, true);
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = MetricData::ExponentialHistogram(ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
Expand Down
83 changes: 52 additions & 31 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,16 @@ impl<T: Number> ExpoHistogram<T> {
max_scale: i8,
record_min_max: bool,
record_sum: bool,
cardinality_limit: usize,
) -> Self {
ExpoHistogram {
value_map: ValueMap::new(BucketConfig {
max_size: max_size as i32,
max_scale,
}),
value_map: ValueMap::new(
BucketConfig {
max_size: max_size as i32,
max_scale,
},
cardinality_limit,
),
init_time: AggregateTimeInitiator::default(),
temporality,
filter,
Expand Down Expand Up @@ -546,6 +550,8 @@ mod tests {

use super::*;

const CARDINALITY_LIMIT_DEFAULT: usize = 2000;

#[test]
fn test_expo_histogram_data_point_record() {
run_data_point_record::<f64>();
Expand Down Expand Up @@ -710,6 +716,7 @@ mod tests {
20,
true,
true,
CARDINALITY_LIMIT_DEFAULT,
);
for v in test.values {
Measure::call(&h, v, &[]);
Expand Down Expand Up @@ -766,6 +773,7 @@ mod tests {
20,
true,
true,
CARDINALITY_LIMIT_DEFAULT,
);
for v in test.values {
Measure::call(&h, v, &[]);
Expand Down Expand Up @@ -1278,12 +1286,13 @@ mod tests {
TestCase {
name: "Delta Single",
build: Box::new(move || {
AggregateBuilder::new(Temporality::Delta, None).exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
AggregateBuilder::new(Temporality::Delta, None, CARDINALITY_LIMIT_DEFAULT)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
}),
input: vec![vec![4, 4, 4, 2, 16, 1]
.into_iter()
Expand Down Expand Up @@ -1318,13 +1327,17 @@ mod tests {
TestCase {
name: "Cumulative Single",
build: Box::new(move || {
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
internal::AggregateBuilder::new(
Temporality::Cumulative,
None,
CARDINALITY_LIMIT_DEFAULT,
)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
}),
input: vec![vec![4, 4, 4, 2, 16, 1]
.into_iter()
Expand Down Expand Up @@ -1359,13 +1372,17 @@ mod tests {
TestCase {
name: "Delta Multiple",
build: Box::new(move || {
internal::AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
internal::AggregateBuilder::new(
Temporality::Delta,
None,
CARDINALITY_LIMIT_DEFAULT,
)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
}),
input: vec![
vec![2, 3, 8].into_iter().map(Into::into).collect(),
Expand Down Expand Up @@ -1403,13 +1420,17 @@ mod tests {
TestCase {
name: "Cumulative Multiple ",
build: Box::new(move || {
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
internal::AggregateBuilder::new(
Temporality::Cumulative,
None,
CARDINALITY_LIMIT_DEFAULT,
)
.exponential_bucket_histogram(
max_size,
max_scale,
record_min_max,
record_sum,
)
}),
input: vec![
vec![2, 3, 8].into_iter().map(Into::into).collect(),
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl<T: Number> Histogram<T> {
mut bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
cardinality_limit: usize,
) -> Self {
#[cfg(feature = "spec_unstable_metrics_views")]
{
Expand All @@ -97,7 +98,7 @@ impl<T: Number> Histogram<T> {

let buckets_count = bounds.len() + 1;
Histogram {
value_map: ValueMap::new(buckets_count),
value_map: ValueMap::new(buckets_count, cardinality_limit),
init_time: AggregateTimeInitiator::default(),
temporality,
filter,
Expand Down Expand Up @@ -262,6 +263,7 @@ mod tests {
vec![1.0, 3.0, 6.0],
false,
false,
2000,
);
for v in 1..11 {
Measure::call(&hist, v, &[]);
Expand Down
8 changes: 6 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ pub(crate) struct LastValue<T: Number> {
}

impl<T: Number> LastValue<T> {
pub(crate) fn new(temporality: Temporality, filter: AttributeSetFilter) -> Self {
pub(crate) fn new(
temporality: Temporality,
filter: AttributeSetFilter,
cardinality_limit: usize,
) -> Self {
LastValue {
value_map: ValueMap::new(()),
value_map: ValueMap::new((), cardinality_limit),
init_time: AggregateTimeInitiator::default(),
temporality,
filter,
Expand Down
Loading