diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 5948747d44..322bcab08e 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -13,13 +13,13 @@ those threads will not be fed back into OTel. Similarly, `SimpleLogProcessor` also modified to suppress telemetry before invoking exporters. - **Feature**: Implemented and enabled cardinality capping for Metrics by - default. + default. [#2901](https://github.com/open-telemetry/opentelemetry-rust/pull/2901) - The default cardinality limit is 2000 and can be customized using Views. - This feature was previously removed in version 0.28 due to the lack of configurability but has now been reintroduced with the ability to configure the limit. - - TODO/Placeholder: Add ability to configure cardinality limits via Instrument - advisory. + - There is ability to configure cardinality limits via Instrument + advisory. [#2903](https://github.com/open-telemetry/opentelemetry-rust/pull/2903) - *Breaking* change for custom `MetricReader` authors. The `shutdown_with_timeout` method is added to `MetricReader` trait. diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index b1d3bd6f4f..fe33c5ee74 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -96,6 +96,7 @@ impl SdkMeter { builder.description, builder.unit, None, + builder.cardinality_limit, ) .map(|i| Counter::new(Arc::new(i))) { @@ -138,6 +139,7 @@ impl SdkMeter { builder.description, builder.unit, None, + builder.cardinality_limit, ) { Ok(ms) => { if ms.is_empty() { @@ -197,6 +199,7 @@ impl SdkMeter { builder.description, builder.unit, None, + builder.cardinality_limit, ) { Ok(ms) => { if ms.is_empty() { @@ -256,6 +259,7 @@ impl SdkMeter { builder.description, builder.unit, None, + builder.cardinality_limit, ) { Ok(ms) => { if ms.is_empty() { @@ -317,6 +321,7 @@ impl SdkMeter { builder.description, builder.unit, None, + builder.cardinality_limit, ) .map(|i| UpDownCounter::new(Arc::new(i))) { @@ -361,6 +366,7 @@ impl SdkMeter { builder.description, builder.unit, None, + builder.cardinality_limit, ) .map(|i| Gauge::new(Arc::new(i))) { @@ -422,6 +428,7 @@ impl SdkMeter { builder.description, builder.unit, builder.boundaries, + builder.cardinality_limit, ) .map(|i| Histogram::new(Arc::new(i))) { @@ -654,8 +661,10 @@ where description: Option>, unit: Option>, boundaries: Option>, + cardinality_limit: Option, ) -> MetricResult> { - let aggregators = self.measures(kind, name, description, unit, boundaries)?; + let aggregators = + self.measures(kind, name, description, unit, boundaries, cardinality_limit)?; Ok(ResolvedMeasures { measures: aggregators, }) @@ -668,6 +677,7 @@ where description: Option>, unit: Option>, boundaries: Option>, + cardinality_limit: Option, ) -> MetricResult>>> { let inst = Instrument { name, @@ -677,7 +687,7 @@ where scope: self.meter.scope.clone(), }; - self.resolve.measures(inst, boundaries) + self.resolve.measures(inst, boundaries, cardinality_limit) } } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index b2f6702695..04198a9099 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -370,12 +370,14 @@ mod tests { async fn counter_aggregation_overflow_delta() { counter_aggregation_overflow_helper(Temporality::Delta); counter_aggregation_overflow_helper_custom_limit(Temporality::Delta); + counter_aggregation_overflow_helper_custom_limit_via_advice(Temporality::Delta); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_aggregation_overflow_cumulative() { counter_aggregation_overflow_helper(Temporality::Cumulative); counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative); + counter_aggregation_overflow_helper_custom_limit_via_advice(Temporality::Cumulative); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -2568,6 +2570,105 @@ mod tests { } } + fn counter_aggregation_overflow_helper_custom_limit_via_advice(temporality: Temporality) { + // Arrange + let cardinality_limit = 2300; + let mut test_context = TestContext::new(temporality); + let meter = test_context.meter(); + let counter = meter + .u64_counter("my_counter") + .with_cardinality_limit(cardinality_limit) + .build(); + + // Act + // Record measurements with A:0, A:1,.......A:cardinality_limit, which just fits in the cardinality_limit + for v in 0..cardinality_limit { + counter.add(100, &[KeyValue::new("A", v.to_string())]); + } + + // Empty attributes is specially treated and does not count towards the limit. + counter.add(3, &[]); + counter.add(3, &[]); + + // All of the below will now go into overflow. + counter.add(100, &[KeyValue::new("A", "foo")]); + counter.add(100, &[KeyValue::new("A", "another")]); + counter.add(100, &[KeyValue::new("A", "yet_another")]); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + // Expecting (cardinality_limit + 1 overflow + Empty attributes) data points. + assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") + .expect("overflow point expected"); + assert_eq!(data_point.value, 300); + + // let empty_attrs_data_point = &sum.data_points[0]; + let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("Empty attributes point expected"); + assert!( + empty_attrs_data_point.attributes.is_empty(), + "Non-empty attribute set" + ); + assert_eq!( + empty_attrs_data_point.value, 6, + "Empty attributes value should be 3+3=6" + ); + + // Phase 2 - for delta temporality, after each collect, data points are cleared + // but for cumulative, they are not cleared. + test_context.reset_metrics(); + // The following should be aggregated normally for Delta, + // and should go into overflow for Cumulative. + counter.add(100, &[KeyValue::new("A", "foo")]); + counter.add(100, &[KeyValue::new("A", "another")]); + counter.add(100, &[KeyValue::new("A", "yet_another")]); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + if temporality == Temporality::Delta { + assert_eq!(sum.data_points.len(), 3); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") + .expect("point expected"); + assert_eq!(data_point.value, 100); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + } else { + // For cumulative, overflow should still be there, and new points should not be added. + assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") + .expect("overflow point expected"); + assert_eq!(data_point.value, 600); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); + assert!(data_point.is_none(), "point should not be present"); + } + } + fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) { // Arrange let mut test_context = TestContext::new(temporality); diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 0a585722b8..e266ea0dc0 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -254,6 +254,7 @@ where &self, inst: Instrument, boundaries: Option<&[f64]>, + cardinality_limit: Option, ) -> MetricResult>>> { let mut matched = false; let mut measures = vec![]; @@ -304,7 +305,7 @@ where unit: inst.unit, aggregation: None, allowed_attribute_keys: None, - cardinality_limit: None, + cardinality_limit, }; // Override default histogram boundaries if provided. @@ -726,11 +727,12 @@ where &self, id: Instrument, boundaries: Option>, + cardinality_limit: Option, ) -> MetricResult>>> { let (mut measures, mut errs) = (vec![], vec![]); for inserter in &self.inserters { - match inserter.instrument(id.clone(), boundaries.as_deref()) { + match inserter.instrument(id.clone(), boundaries.as_deref(), cardinality_limit) { Ok(ms) => measures.extend(ms), Err(err) => errs.push(err), } diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 28a10e0db0..45573d46ed 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -24,6 +24,8 @@ disable telemetry generation during their internal operations, ensuring more predictable and efficient observability pipelines. - re-export `tracing` for `internal-logs` feature to remove the need of adding `tracing` as a dependency +- Added ability to configure cardinality limits via Instrument + advisory. [#2903](https://github.com/open-telemetry/opentelemetry-rust/pull/2903) ## 0.29.1 diff --git a/opentelemetry/src/metrics/instruments/mod.rs b/opentelemetry/src/metrics/instruments/mod.rs index 48d238ef5b..1c5d5561d2 100644 --- a/opentelemetry/src/metrics/instruments/mod.rs +++ b/opentelemetry/src/metrics/instruments/mod.rs @@ -45,6 +45,9 @@ pub struct HistogramBuilder<'a, T> { /// Unit of the Histogram. pub unit: Option>, + /// Cardinality limit for the Histogram. + pub cardinality_limit: Option, + /// Bucket boundaries for the histogram. pub boundaries: Option>, @@ -60,6 +63,7 @@ impl<'a, T> HistogramBuilder<'a, T> { name, description: None, unit: None, + cardinality_limit: None, boundaries: None, _marker: marker::PhantomData, } @@ -83,6 +87,14 @@ impl<'a, T> HistogramBuilder<'a, T> { self } + /// Set the cardinality limit for this Histogram. + /// Setting cardinality limit is optional. By default, the limit will be set + /// to 2000. + pub fn with_cardinality_limit(mut self, limit: usize) -> Self { + self.cardinality_limit = Some(limit); + self + } + /// Set the boundaries for this histogram. /// /// Setting boundaries is optional. By default, the boundaries are set to: @@ -150,6 +162,9 @@ pub struct InstrumentBuilder<'a, T> { /// Unit of the instrument. pub unit: Option>, + /// Cardinality limit for the instrument. + pub cardinality_limit: Option, + _marker: marker::PhantomData, } @@ -161,6 +176,7 @@ impl<'a, T> InstrumentBuilder<'a, T> { name, description: None, unit: None, + cardinality_limit: None, _marker: marker::PhantomData, } } @@ -182,6 +198,14 @@ impl<'a, T> InstrumentBuilder<'a, T> { self.unit = Some(unit.into()); self } + + /// Set the cardinality limit for this instrument. + /// Setting cardinality limit is optional. By default, the limit will be set + /// to 2000. + pub fn with_cardinality_limit(mut self, limit: usize) -> Self { + self.cardinality_limit = Some(limit); + self + } } macro_rules! build_instrument { @@ -211,6 +235,7 @@ impl fmt::Debug for InstrumentBuilder<'_, T> { .field("name", &self.name) .field("description", &self.description) .field("unit", &self.unit) + .field("cardinality_limit", &self.cardinality_limit) .field("kind", &std::any::type_name::()) .finish() } @@ -222,6 +247,7 @@ impl fmt::Debug for HistogramBuilder<'_, T> { .field("name", &self.name) .field("description", &self.description) .field("unit", &self.unit) + .field("cardinality_limit", &self.cardinality_limit) .field("boundaries", &self.boundaries) .field( "kind", @@ -255,6 +281,9 @@ pub struct AsyncInstrumentBuilder<'a, I, M> { /// Unit of the instrument. pub unit: Option>, + /// Cardinality limit for the instrument. + pub cardinality_limit: Option, + /// Callbacks to be called for this instrument. pub callbacks: Vec>, @@ -269,6 +298,7 @@ impl<'a, I, M> AsyncInstrumentBuilder<'a, I, M> { name, description: None, unit: None, + cardinality_limit: None, _inst: marker::PhantomData, callbacks: Vec::new(), } @@ -292,6 +322,14 @@ impl<'a, I, M> AsyncInstrumentBuilder<'a, I, M> { self } + /// Set the cardinality limit for this async instrument. + /// Setting cardinality limit is optional. By default, the limit will be set + /// to 2000. + pub fn with_cardinality_limit(mut self, limit: usize) -> Self { + self.cardinality_limit = Some(limit); + self + } + /// Set the callback to be called for this instrument. pub fn with_callback(mut self, callback: F) -> Self where @@ -340,6 +378,7 @@ where .field("name", &self.name) .field("description", &self.description) .field("unit", &self.unit) + .field("cardinality_limit", &self.cardinality_limit) .field("kind", &std::any::type_name::()) .field("callbacks_len", &self.callbacks.len()) .finish()