Skip to content

feat: Add ability to specify cardinality limit via Instrument advice #2903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
6 changes: 3 additions & 3 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ those threads will not be fed back into OTel. Similarly, `SimpleLogProcessor`
also modified to suppress telemetry before invoking exporters.

- **Feature**: Implemented and enabled cardinality capping for Metrics by
default.
default. [#2901](https://github.com/open-telemetry/opentelemetry-rust/pull/2901)
- The default cardinality limit is 2000 and can be customized using Views.
- This feature was previously removed in version 0.28 due to the lack of
configurability but has now been reintroduced with the ability to configure
the limit.
- TODO/Placeholder: Add ability to configure cardinality limits via Instrument
advisory.
- There is ability to configure cardinality limits via Instrument
advisory. [#2903](https://github.com/open-telemetry/opentelemetry-rust/pull/2903)

- *Breaking* change for custom `MetricReader` authors.
The `shutdown_with_timeout` method is added to `MetricReader` trait.
Expand Down
14 changes: 12 additions & 2 deletions opentelemetry-sdk/src/metrics/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl SdkMeter {
builder.description,
builder.unit,
None,
builder.cardinality_limit,
)
.map(|i| Counter::new(Arc::new(i)))
{
Expand Down Expand Up @@ -138,6 +139,7 @@ impl SdkMeter {
builder.description,
builder.unit,
None,
builder.cardinality_limit,
) {
Ok(ms) => {
if ms.is_empty() {
Expand Down Expand Up @@ -197,6 +199,7 @@ impl SdkMeter {
builder.description,
builder.unit,
None,
builder.cardinality_limit,
) {
Ok(ms) => {
if ms.is_empty() {
Expand Down Expand Up @@ -256,6 +259,7 @@ impl SdkMeter {
builder.description,
builder.unit,
None,
builder.cardinality_limit,
) {
Ok(ms) => {
if ms.is_empty() {
Expand Down Expand Up @@ -317,6 +321,7 @@ impl SdkMeter {
builder.description,
builder.unit,
None,
builder.cardinality_limit,
)
.map(|i| UpDownCounter::new(Arc::new(i)))
{
Expand Down Expand Up @@ -361,6 +366,7 @@ impl SdkMeter {
builder.description,
builder.unit,
None,
builder.cardinality_limit,
)
.map(|i| Gauge::new(Arc::new(i)))
{
Expand Down Expand Up @@ -422,6 +428,7 @@ impl SdkMeter {
builder.description,
builder.unit,
builder.boundaries,
builder.cardinality_limit,
)
.map(|i| Histogram::new(Arc::new(i)))
{
Expand Down Expand Up @@ -654,8 +661,10 @@ where
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
boundaries: Option<Vec<f64>>,
cardinality_limit: Option<usize>,
) -> MetricResult<ResolvedMeasures<T>> {
let aggregators = self.measures(kind, name, description, unit, boundaries)?;
let aggregators =
self.measures(kind, name, description, unit, boundaries, cardinality_limit)?;
Ok(ResolvedMeasures {
measures: aggregators,
})
Expand All @@ -668,6 +677,7 @@ where
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
boundaries: Option<Vec<f64>>,
cardinality_limit: Option<usize>,
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
let inst = Instrument {
name,
Expand All @@ -677,7 +687,7 @@ where
scope: self.meter.scope.clone(),
};

self.resolve.measures(inst, boundaries)
self.resolve.measures(inst, boundaries, cardinality_limit)
}
}

Expand Down
101 changes: 101 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,14 @@
async fn counter_aggregation_overflow_delta() {
counter_aggregation_overflow_helper(Temporality::Delta);
counter_aggregation_overflow_helper_custom_limit(Temporality::Delta);
counter_aggregation_overflow_helper_custom_limit_via_advice(Temporality::Delta);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_overflow_cumulative() {
counter_aggregation_overflow_helper(Temporality::Cumulative);
counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative);
counter_aggregation_overflow_helper_custom_limit_via_advice(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -2568,6 +2570,105 @@
}
}

fn counter_aggregation_overflow_helper_custom_limit_via_advice(temporality: Temporality) {
// Arrange
let cardinality_limit = 2300;
let mut test_context = TestContext::new(temporality);
let meter = test_context.meter();
let counter = meter
.u64_counter("my_counter")
.with_cardinality_limit(cardinality_limit)
.build();

// Act
// Record measurements with A:0, A:1,.......A:cardinality_limit, which just fits in the cardinality_limit
for v in 0..cardinality_limit {
counter.add(100, &[KeyValue::new("A", v.to_string())]);
}

// Empty attributes is specially treated and does not count towards the limit.
counter.add(3, &[]);
counter.add(3, &[]);

// All of the below will now go into overflow.
counter.add(100, &[KeyValue::new("A", "foo")]);
counter.add(100, &[KeyValue::new("A", "another")]);
counter.add(100, &[KeyValue::new("A", "yet_another")]);
test_context.flush_metrics();

let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
unreachable!()

Check warning on line 2600 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2600

Added line #L2600 was not covered by tests
};

// Expecting (cardinality_limit + 1 overflow + Empty attributes) data points.
assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);

let data_point =
find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
.expect("overflow point expected");
assert_eq!(data_point.value, 300);

// let empty_attrs_data_point = &sum.data_points[0];
let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
.expect("Empty attributes point expected");
assert!(
empty_attrs_data_point.attributes.is_empty(),
"Non-empty attribute set"

Check warning on line 2616 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2616

Added line #L2616 was not covered by tests
);
assert_eq!(
empty_attrs_data_point.value, 6,
"Empty attributes value should be 3+3=6"

Check warning on line 2620 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2620

Added line #L2620 was not covered by tests
);

// Phase 2 - for delta temporality, after each collect, data points are cleared
// but for cumulative, they are not cleared.
test_context.reset_metrics();
// The following should be aggregated normally for Delta,
// and should go into overflow for Cumulative.
counter.add(100, &[KeyValue::new("A", "foo")]);
counter.add(100, &[KeyValue::new("A", "another")]);
counter.add(100, &[KeyValue::new("A", "yet_another")]);
test_context.flush_metrics();

let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
unreachable!()

Check warning on line 2634 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2634

Added line #L2634 was not covered by tests
};

if temporality == Temporality::Delta {
assert_eq!(sum.data_points.len(), 3);

let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
.expect("point expected");
assert_eq!(data_point.value, 100);

let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
.expect("point expected");
assert_eq!(data_point.value, 100);

let data_point =
find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
.expect("point expected");
assert_eq!(data_point.value, 100);
} else {
// For cumulative, overflow should still be there, and new points should not be added.
assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
let data_point =
find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
.expect("overflow point expected");
assert_eq!(data_point.value, 600);

let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
assert!(data_point.is_none(), "point should not be present");

let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
assert!(data_point.is_none(), "point should not be present");

let data_point =
find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
assert!(data_point.is_none(), "point should not be present");
}
}

fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down
6 changes: 4 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ where
&self,
inst: Instrument,
boundaries: Option<&[f64]>,
cardinality_limit: Option<usize>,
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
let mut matched = false;
let mut measures = vec![];
Expand Down Expand Up @@ -304,7 +305,7 @@ where
unit: inst.unit,
aggregation: None,
allowed_attribute_keys: None,
cardinality_limit: None,
cardinality_limit,
};

// Override default histogram boundaries if provided.
Expand Down Expand Up @@ -726,11 +727,12 @@ where
&self,
id: Instrument,
boundaries: Option<Vec<f64>>,
cardinality_limit: Option<usize>,
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
let (mut measures, mut errs) = (vec![], vec![]);

for inserter in &self.inserters {
match inserter.instrument(id.clone(), boundaries.as_deref()) {
match inserter.instrument(id.clone(), boundaries.as_deref(), cardinality_limit) {
Ok(ms) => measures.extend(ms),
Err(err) => errs.push(err),
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ disable telemetry generation during their internal operations, ensuring more
predictable and efficient observability pipelines.

- re-export `tracing` for `internal-logs` feature to remove the need of adding `tracing` as a dependency
- Added ability to configure cardinality limits via Instrument
advisory. [#2903](https://github.com/open-telemetry/opentelemetry-rust/pull/2903)

## 0.29.1

Expand Down
39 changes: 39 additions & 0 deletions opentelemetry/src/metrics/instruments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
/// Unit of the Histogram.
pub unit: Option<Cow<'static, str>>,

/// Cardinality limit for the Histogram.
pub cardinality_limit: Option<usize>,

/// Bucket boundaries for the histogram.
pub boundaries: Option<Vec<f64>>,

Expand All @@ -60,6 +63,7 @@
name,
description: None,
unit: None,
cardinality_limit: None,
boundaries: None,
_marker: marker::PhantomData,
}
Expand All @@ -83,6 +87,14 @@
self
}

/// Set the cardinality limit for this Histogram.
/// Setting cardinality limit is optional. By default, the limit will be set
/// to 2000.
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
self.cardinality_limit = Some(limit);
self
}

Check warning on line 96 in opentelemetry/src/metrics/instruments/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/metrics/instruments/mod.rs#L93-L96

Added lines #L93 - L96 were not covered by tests

/// Set the boundaries for this histogram.
///
/// Setting boundaries is optional. By default, the boundaries are set to:
Expand Down Expand Up @@ -150,6 +162,9 @@
/// Unit of the instrument.
pub unit: Option<Cow<'static, str>>,

/// Cardinality limit for the instrument.
pub cardinality_limit: Option<usize>,

_marker: marker::PhantomData<T>,
}

Expand All @@ -161,6 +176,7 @@
name,
description: None,
unit: None,
cardinality_limit: None,
_marker: marker::PhantomData,
}
}
Expand All @@ -182,6 +198,14 @@
self.unit = Some(unit.into());
self
}

/// Set the cardinality limit for this instrument.
/// Setting cardinality limit is optional. By default, the limit will be set
/// to 2000.
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
self.cardinality_limit = Some(limit);
self
}
}

macro_rules! build_instrument {
Expand Down Expand Up @@ -211,6 +235,7 @@
.field("name", &self.name)
.field("description", &self.description)
.field("unit", &self.unit)
.field("cardinality_limit", &self.cardinality_limit)

Check warning on line 238 in opentelemetry/src/metrics/instruments/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/metrics/instruments/mod.rs#L238

Added line #L238 was not covered by tests
.field("kind", &std::any::type_name::<T>())
.finish()
}
Expand All @@ -222,6 +247,7 @@
.field("name", &self.name)
.field("description", &self.description)
.field("unit", &self.unit)
.field("cardinality_limit", &self.cardinality_limit)

Check warning on line 250 in opentelemetry/src/metrics/instruments/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/metrics/instruments/mod.rs#L250

Added line #L250 was not covered by tests
.field("boundaries", &self.boundaries)
.field(
"kind",
Expand Down Expand Up @@ -255,6 +281,9 @@
/// Unit of the instrument.
pub unit: Option<Cow<'static, str>>,

/// Cardinality limit for the instrument.
pub cardinality_limit: Option<usize>,

/// Callbacks to be called for this instrument.
pub callbacks: Vec<Callback<M>>,

Expand All @@ -269,6 +298,7 @@
name,
description: None,
unit: None,
cardinality_limit: None,
_inst: marker::PhantomData,
callbacks: Vec::new(),
}
Expand All @@ -292,6 +322,14 @@
self
}

/// Set the cardinality limit for this async instrument.
/// Setting cardinality limit is optional. By default, the limit will be set
/// to 2000.
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
self.cardinality_limit = Some(limit);
self
}

Check warning on line 331 in opentelemetry/src/metrics/instruments/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/metrics/instruments/mod.rs#L328-L331

Added lines #L328 - L331 were not covered by tests

/// Set the callback to be called for this instrument.
pub fn with_callback<F>(mut self, callback: F) -> Self
where
Expand Down Expand Up @@ -340,6 +378,7 @@
.field("name", &self.name)
.field("description", &self.description)
.field("unit", &self.unit)
.field("cardinality_limit", &self.cardinality_limit)

Check warning on line 381 in opentelemetry/src/metrics/instruments/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/metrics/instruments/mod.rs#L381

Added line #L381 was not covered by tests
.field("kind", &std::any::type_name::<I>())
.field("callbacks_len", &self.callbacks.len())
.finish()
Expand Down