diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 65db0dcc9e..aa3a58b9ba 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -12,6 +12,15 @@ the suppression flag in their dedicated thread, so that telemetry generated from those threads will not be fed back into OTel. Similarly, `SimpleLogProcessor` also modified to suppress telemetry before invoking exporters. +- **Feature**: Implemented and enabled cardinality capping for Metrics by + default. + - The default cardinality limit is 2000 and can be customized using Views. + - This feature was previously removed in version 0.28 due to the lack of + configurability but has now been reintroduced with the ability to configure + the limit. + - TODO/Placeholder: Add ability to configure cardinality limits via Instrument + advisory. + ## 0.29.0 Released 2025-Mar-21 diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index cd97b0755e..559e9c5328 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -203,6 +203,9 @@ pub struct Stream { /// dropped. If the set is empty, all attributes will be dropped, if `None` all /// attributes will be kept. pub allowed_attribute_keys: Option>>, + + /// Cardinality limit for the stream. + pub cardinality_limit: Option, } #[cfg(feature = "spec_unstable_metrics_views")] @@ -245,6 +248,12 @@ impl Stream { self } + + /// Set the stream cardinality limit. + pub fn cardinality_limit(mut self, limit: usize) -> Self { + self.cardinality_limit = Some(limit); + self + } } /// The identifying properties of an instrument. diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index c07c56acca..e8e5cd433c 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -15,16 +15,6 @@ use super::{ precomputed_sum::PrecomputedSum, sum::Sum, Number, }; -pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000; - -/// Checks whether aggregator has hit cardinality limit for metric streams -pub(crate) fn is_under_cardinality_limit(_size: usize) -> bool { - true - - // TODO: Implement this feature, after allowing the ability to customize the cardinality limit. - // size < STREAM_CARDINALITY_LIMIT -} - /// Receives measurements to be aggregated. pub(crate) trait Measure: Send + Sync + 'static { fn call(&self, measurement: T, attrs: &[KeyValue]); @@ -133,14 +123,22 @@ pub(crate) struct AggregateBuilder { /// measurements. filter: AttributeSetFilter, + /// Cardinality limit for the metric stream + cardinality_limit: usize, + _marker: marker::PhantomData, } impl AggregateBuilder { - pub(crate) fn new(temporality: Temporality, filter: Option) -> Self { + pub(crate) fn new( + temporality: Temporality, + filter: Option, + cardinality_limit: usize, + ) -> Self { AggregateBuilder { temporality, filter: AttributeSetFilter::new(filter), + cardinality_limit, _marker: marker::PhantomData, } } @@ -150,18 +148,31 @@ impl AggregateBuilder { LastValue::new( overwrite_temporality.unwrap_or(self.temporality), self.filter.clone(), + self.cardinality_limit, ) .into() } /// Builds a precomputed sum aggregate function input and output. pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns { - PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into() + PrecomputedSum::new( + self.temporality, + self.filter.clone(), + monotonic, + self.cardinality_limit, + ) + .into() } /// Builds a sum aggregate function input and output. pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns { - Sum::new(self.temporality, self.filter.clone(), monotonic).into() + Sum::new( + self.temporality, + self.filter.clone(), + monotonic, + self.cardinality_limit, + ) + .into() } /// Builds a histogram aggregate function input and output. @@ -177,6 +188,7 @@ impl AggregateBuilder { boundaries, record_min_max, record_sum, + self.cardinality_limit, ) .into() } @@ -196,6 +208,7 @@ impl AggregateBuilder { max_scale, record_min_max, record_sum, + self.cardinality_limit, ) .into() } @@ -211,10 +224,13 @@ mod tests { use super::*; + const CARDINALITY_LIMIT_DEFAULT: usize = 2000; + #[test] fn last_value_aggregation() { let AggregateFns { measure, collect } = - AggregateBuilder::::new(Temporality::Cumulative, None).last_value(None); + AggregateBuilder::::new(Temporality::Cumulative, None, CARDINALITY_LIMIT_DEFAULT) + .last_value(None); let mut a = MetricData::Gauge(Gauge { data_points: vec![GaugeDataPoint { attributes: vec![KeyValue::new("a", 1)], @@ -244,7 +260,8 @@ mod tests { fn precomputed_sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { let AggregateFns { measure, collect } = - AggregateBuilder::::new(temporality, None).precomputed_sum(true); + AggregateBuilder::::new(temporality, None, CARDINALITY_LIMIT_DEFAULT) + .precomputed_sum(true); let mut a = MetricData::Sum(Sum { data_points: vec![ SumDataPoint { @@ -290,7 +307,8 @@ mod tests { fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { let AggregateFns { measure, collect } = - AggregateBuilder::::new(temporality, None).sum(true); + AggregateBuilder::::new(temporality, None, CARDINALITY_LIMIT_DEFAULT) + .sum(true); let mut a = MetricData::Sum(Sum { data_points: vec![ SumDataPoint { @@ -335,8 +353,9 @@ mod tests { #[test] fn explicit_bucket_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None) - .explicit_bucket_histogram(vec![1.0], true, true); + let AggregateFns { measure, collect } = + AggregateBuilder::::new(temporality, None, CARDINALITY_LIMIT_DEFAULT) + .explicit_bucket_histogram(vec![1.0], true, true); let mut a = MetricData::Histogram(Histogram { data_points: vec![HistogramDataPoint { attributes: vec![KeyValue::new("a1", 1)], @@ -382,8 +401,9 @@ mod tests { #[test] fn exponential_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None) - .exponential_bucket_histogram(4, 20, true, true); + let AggregateFns { measure, collect } = + AggregateBuilder::::new(temporality, None, CARDINALITY_LIMIT_DEFAULT) + .exponential_bucket_histogram(4, 20, true, true); let mut a = MetricData::ExponentialHistogram(ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { attributes: vec![KeyValue::new("a1", 1)], diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 30a20c0fa9..1a280a8918 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -369,12 +369,16 @@ impl ExpoHistogram { max_scale: i8, record_min_max: bool, record_sum: bool, + cardinality_limit: usize, ) -> Self { ExpoHistogram { - value_map: ValueMap::new(BucketConfig { - max_size: max_size as i32, - max_scale, - }), + value_map: ValueMap::new( + BucketConfig { + max_size: max_size as i32, + max_scale, + }, + cardinality_limit, + ), init_time: AggregateTimeInitiator::default(), temporality, filter, @@ -546,6 +550,8 @@ mod tests { use super::*; + const CARDINALITY_LIMIT_DEFAULT: usize = 2000; + #[test] fn test_expo_histogram_data_point_record() { run_data_point_record::(); @@ -710,6 +716,7 @@ mod tests { 20, true, true, + CARDINALITY_LIMIT_DEFAULT, ); for v in test.values { Measure::call(&h, v, &[]); @@ -766,6 +773,7 @@ mod tests { 20, true, true, + CARDINALITY_LIMIT_DEFAULT, ); for v in test.values { Measure::call(&h, v, &[]); @@ -1278,12 +1286,13 @@ mod tests { TestCase { name: "Delta Single", build: Box::new(move || { - AggregateBuilder::new(Temporality::Delta, None).exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ) + AggregateBuilder::new(Temporality::Delta, None, CARDINALITY_LIMIT_DEFAULT) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![vec![4, 4, 4, 2, 16, 1] .into_iter() @@ -1318,13 +1327,17 @@ mod tests { TestCase { name: "Cumulative Single", build: Box::new(move || { - internal::AggregateBuilder::new(Temporality::Cumulative, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ) + internal::AggregateBuilder::new( + Temporality::Cumulative, + None, + CARDINALITY_LIMIT_DEFAULT, + ) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![vec![4, 4, 4, 2, 16, 1] .into_iter() @@ -1359,13 +1372,17 @@ mod tests { TestCase { name: "Delta Multiple", build: Box::new(move || { - internal::AggregateBuilder::new(Temporality::Delta, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ) + internal::AggregateBuilder::new( + Temporality::Delta, + None, + CARDINALITY_LIMIT_DEFAULT, + ) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![ vec![2, 3, 8].into_iter().map(Into::into).collect(), @@ -1403,13 +1420,17 @@ mod tests { TestCase { name: "Cumulative Multiple ", build: Box::new(move || { - internal::AggregateBuilder::new(Temporality::Cumulative, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ) + internal::AggregateBuilder::new( + Temporality::Cumulative, + None, + CARDINALITY_LIMIT_DEFAULT, + ) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![ vec![2, 3, 8].into_iter().map(Into::into).collect(), diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 62f2e236e8..a085ec8958 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -87,6 +87,7 @@ impl Histogram { mut bounds: Vec, record_min_max: bool, record_sum: bool, + cardinality_limit: usize, ) -> Self { #[cfg(feature = "spec_unstable_metrics_views")] { @@ -97,7 +98,7 @@ impl Histogram { let buckets_count = bounds.len() + 1; Histogram { - value_map: ValueMap::new(buckets_count), + value_map: ValueMap::new(buckets_count, cardinality_limit), init_time: AggregateTimeInitiator::default(), temporality, filter, @@ -262,6 +263,7 @@ mod tests { vec![1.0, 3.0, 6.0], false, false, + 2000, ); for v in 1..11 { Measure::call(&hist, v, &[]); diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index 9d8576f15b..33a82bf78c 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -50,9 +50,13 @@ pub(crate) struct LastValue { } impl LastValue { - pub(crate) fn new(temporality: Temporality, filter: AttributeSetFilter) -> Self { + pub(crate) fn new( + temporality: Temporality, + filter: AttributeSetFilter, + cardinality_limit: usize, + ) -> Self { LastValue { - value_map: ValueMap::new(()), + value_map: ValueMap::new((), cardinality_limit), init_time: AggregateTimeInitiator::default(), temporality, filter, diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 5df1d65125..edfcbe6df7 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -12,7 +12,6 @@ use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock, RwLock}; -use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT}; pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; use opentelemetry::{otel_warn, KeyValue}; @@ -71,27 +70,34 @@ where no_attribute_tracker: A, /// Configuration for an Aggregator config: A::InitConfig, + cardinality_limit: usize, } impl ValueMap where A: Aggregator, { - fn new(config: A::InitConfig) -> Self { + fn new(config: A::InitConfig, cardinality_limit: usize) -> Self { ValueMap { - trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)), + trackers: RwLock::new(HashMap::with_capacity(1 + cardinality_limit)), trackers_for_collect: OnceLock::new(), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), config, + cardinality_limit, } } #[inline] fn trackers_for_collect(&self) -> &RwLock, Arc>> { self.trackers_for_collect - .get_or_init(|| RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT))) + .get_or_init(|| RwLock::new(HashMap::with_capacity(1 + self.cardinality_limit))) + } + + /// Checks whether aggregator has hit cardinality limit for metric streams + fn is_under_cardinality_limit(&self) -> bool { + self.count.load(Ordering::SeqCst) < self.cardinality_limit } fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) { @@ -131,7 +137,7 @@ where tracker.update(value); } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { tracker.update(value); - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { + } else if self.is_under_cardinality_limit() { let new_tracker = Arc::new(A::create(&self.config)); new_tracker.update(value); diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 6e3dc7d541..fc372de007 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -23,9 +23,10 @@ impl PrecomputedSum { temporality: Temporality, filter: AttributeSetFilter, monotonic: bool, + cardinality_limit: usize, ) -> Self { PrecomputedSum { - value_map: ValueMap::new(()), + value_map: ValueMap::new((), cardinality_limit), init_time: AggregateTimeInitiator::default(), temporality, filter, diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 79ae5d711d..a55fbdada3 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -56,9 +56,10 @@ impl Sum { temporality: Temporality, filter: AttributeSetFilter, monotonic: bool, + cardinality_limit: usize, ) -> Self { Sum { - value_map: ValueMap::new(()), + value_map: ValueMap::new((), cardinality_limit), init_time: AggregateTimeInitiator::default(), temporality, filter, diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 608ce62775..b2f6702695 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -366,16 +366,16 @@ mod tests { assert_eq!(data_point.value, 50, "Unexpected data point value"); } - #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_aggregation_overflow_delta() { counter_aggregation_overflow_helper(Temporality::Delta); + counter_aggregation_overflow_helper_custom_limit(Temporality::Delta); } - #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_aggregation_overflow_cumulative() { counter_aggregation_overflow_helper(Temporality::Cumulative); + counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -2412,6 +2412,160 @@ mod tests { empty_attrs_data_point.value, 6, "Empty attributes value should be 3+3=6" ); + + // Phase 2 - for delta temporality, after each collect, data points are cleared + // but for cumulative, they are not cleared. + test_context.reset_metrics(); + // The following should be aggregated normally for Delta, + // and should go into overflow for Cumulative. + counter.add(100, &[KeyValue::new("A", "foo")]); + counter.add(100, &[KeyValue::new("A", "another")]); + counter.add(100, &[KeyValue::new("A", "yet_another")]); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + if temporality == Temporality::Delta { + assert_eq!(sum.data_points.len(), 3); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") + .expect("point expected"); + assert_eq!(data_point.value, 100); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + } else { + // For cumulative, overflow should still be there, and new points should not be added. + assert_eq!(sum.data_points.len(), 2002); + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") + .expect("overflow point expected"); + assert_eq!(data_point.value, 600); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); + assert!(data_point.is_none(), "point should not be present"); + } + } + + fn counter_aggregation_overflow_helper_custom_limit(temporality: Temporality) { + // Arrange + let cardinality_limit = 2300; + let view_change_cardinality = move |i: &Instrument| { + if i.name == "my_counter" { + Some( + Stream::new() + .name("my_counter") + .cardinality_limit(cardinality_limit), + ) + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(temporality, view_change_cardinality); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Act + // Record measurements with A:0, A:1,.......A:cardinality_limit, which just fits in the cardinality_limit + for v in 0..cardinality_limit { + counter.add(100, &[KeyValue::new("A", v.to_string())]); + } + + // Empty attributes is specially treated and does not count towards the limit. + counter.add(3, &[]); + counter.add(3, &[]); + + // All of the below will now go into overflow. + counter.add(100, &[KeyValue::new("A", "foo")]); + counter.add(100, &[KeyValue::new("A", "another")]); + counter.add(100, &[KeyValue::new("A", "yet_another")]); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + // Expecting (cardinality_limit + 1 overflow + Empty attributes) data points. + assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") + .expect("overflow point expected"); + assert_eq!(data_point.value, 300); + + // let empty_attrs_data_point = &sum.data_points[0]; + let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("Empty attributes point expected"); + assert!( + empty_attrs_data_point.attributes.is_empty(), + "Non-empty attribute set" + ); + assert_eq!( + empty_attrs_data_point.value, 6, + "Empty attributes value should be 3+3=6" + ); + + // Phase 2 - for delta temporality, after each collect, data points are cleared + // but for cumulative, they are not cleared. + test_context.reset_metrics(); + // The following should be aggregated normally for Delta, + // and should go into overflow for Cumulative. + counter.add(100, &[KeyValue::new("A", "foo")]); + counter.add(100, &[KeyValue::new("A", "another")]); + counter.add(100, &[KeyValue::new("A", "yet_another")]); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + if temporality == Temporality::Delta { + assert_eq!(sum.data_points.len(), 3); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") + .expect("point expected"); + assert_eq!(data_point.value, 100); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + } else { + // For cumulative, overflow should still be there, and new points should not be added. + assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") + .expect("overflow point expected"); + assert_eq!(data_point.value, 600); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); + assert!(data_point.is_none(), "point should not be present"); + } } fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) { @@ -2664,6 +2818,21 @@ mod tests { } } + fn new_with_view(temporality: Temporality, view: T) -> Self { + let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality); + let exporter = exporter.build(); + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view) + .build(); + + TestContext { + exporter, + meter_provider, + resource_metrics: vec![], + } + } + fn u64_counter( &self, meter_name: &'static str, diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 65d587e7cd..25fd023d8d 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -50,6 +50,8 @@ impl fmt::Debug for Pipeline { /// Single or multi-instrument callbacks type GenericCallback = Arc; +const DEFAULT_CARDINALITY_LIMIT: usize = 2000; + #[derive(Default)] struct PipelineInner { aggregations: HashMap>, @@ -73,10 +75,6 @@ impl Pipeline { /// unique values. fn add_sync(&self, scope: InstrumentationScope, i_sync: InstrumentSync) { let _ = self.inner.lock().map(|mut inner| { - otel_debug!( - name : "InstrumentCreated", - instrument_name = i_sync.name.as_ref(), - ); inner.aggregations.entry(scope).or_default().push(i_sync); }); } @@ -303,6 +301,7 @@ where unit: inst.unit, aggregation: None, allowed_attribute_keys: None, + cardinality_limit: None, }; // Override default histogram boundaries if provided. @@ -385,12 +384,25 @@ where .clone() .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); - let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter); + let cardinality_limit = stream + .cardinality_limit + .unwrap_or(DEFAULT_CARDINALITY_LIMIT); + let b = AggregateBuilder::new( + self.pipeline.reader.temporality(kind), + filter, + cardinality_limit, + ); let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) { Ok(Some(inst)) => inst, other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error }; + otel_debug!( + name : "Metrics.InstrumentCreated", + instrument_name = stream.name.as_ref(), + cardinality_limit = cardinality_limit, + ); + self.pipeline.add_sync( scope.clone(), InstrumentSync { diff --git a/opentelemetry-sdk/src/metrics/view.rs b/opentelemetry-sdk/src/metrics/view.rs index ac89606fd0..98c9dd20c6 100644 --- a/opentelemetry-sdk/src/metrics/view.rs +++ b/opentelemetry-sdk/src/metrics/view.rs @@ -161,6 +161,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult