Skip to content

Implement cardinality limits for metric streams #1066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

## Unreleased

### Added

- Implement cardinality limits for metric streams
[#1066](https://github.com/open-telemetry/opentelemetry-rust/pull/1066).

### Removed

- Samplers no longer has access to `InstrumentationLibrary` as one of parameters
to `should_sample`.
[#1041](https://github.com/open-telemetry/opentelemetry-rust/pull/1041).
Expand Down
21 changes: 21 additions & 0 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ fn bench_counter(
fn counters(c: &mut Criterion) {
let (cx, _, cntr) = bench_counter(None, "cumulative");
let (cx2, _, cntr2) = bench_counter(None, "delta");
let (cx3, _, cntr3) = bench_counter(None, "cumulative");

let mut group = c.benchmark_group("Counter");
group.bench_function("AddNoAttrs", |b| b.iter(|| cntr.add(&cx, 1, &[])));
Expand Down Expand Up @@ -278,6 +279,26 @@ fn counters(c: &mut Criterion) {
)
})
});

const MAX_DATA_POINTS: i64 = 2000;
let mut max_attributes: Vec<KeyValue> = Vec::new();

for i in 0..MAX_DATA_POINTS - 2 {
max_attributes.push(KeyValue::new(i.to_string(), i))
}

group.bench_function("AddOneTillMaxAttr", |b| {
b.iter(|| cntr3.add(&cx3, 1, &max_attributes))
});

for i in MAX_DATA_POINTS..MAX_DATA_POINTS * 2 {
max_attributes.push(KeyValue::new(i.to_string(), i))
}

group.bench_function("AddMaxAttr", |b| {
b.iter(|| cntr3.add(&cx3, 1, &max_attributes))
});

group.bench_function("AddInvalidAttr", |b| {
b.iter(|| cntr.add(&cx, 1, &[KeyValue::new("", "V"), KeyValue::new("K", "V")]))
});
Expand Down
14 changes: 13 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use crate::{attributes::AttributeSet, metrics::data::Aggregation};
use once_cell::sync::Lazy;
use opentelemetry_api::KeyValue;
use std::sync::Arc;

use crate::{attributes::AttributeSet, metrics::data::Aggregation};
const STREAM_CARDINALITY_LIMIT: u32 = 2000;
pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(|| {
let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")];
AttributeSet::from(&key_values[..])
});

/// Forms an aggregation from a collection of recorded measurements.
pub(crate) trait Aggregator<T>: Send + Sync {
Expand All @@ -15,6 +22,11 @@ pub(crate) trait Aggregator<T>: Send + Sync {
fn as_precompute_aggregator(&self) -> Option<Arc<dyn PrecomputeAggregator<T>>> {
None
}

/// Checks whether aggregator has hit cardinality limit for metric streams
fn is_under_cardinality_limit(&self, size: usize) -> bool {
size < STREAM_CARDINALITY_LIMIT as usize - 1
}
}

/// An `Aggregator` that receives values to aggregate that have been pre-computed by the caller.
Expand Down
53 changes: 35 additions & 18 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
time::SystemTime,
};
Expand All @@ -9,8 +9,9 @@ use crate::metrics::{
aggregation,
data::{self, Aggregation},
};
use opentelemetry_api::{global, metrics::MetricsError};

use super::{Aggregator, Number};
use super::{aggregator::STREAM_OVERFLOW_ATTRIBUTE_SET, Aggregator, Number};

#[derive(Default)]
struct Buckets<T> {
Expand Down Expand Up @@ -78,22 +79,38 @@ where
Ok(guard) => guard,
Err(_) => return,
};

let b = values.entry(attrs).or_insert_with(|| {
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
let mut b = Buckets::new(self.bounds.len() + 1);
// Ensure min and max are recorded values (not zero), for new buckets.
(b.min, b.max) = (measurement, measurement);

b
});
b.bin(idx, measurement)
let size = values.len();

match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => occupied_entry.get_mut().bin(idx, measurement),
Entry::Vacant(vacant_entry) => {
if self.is_under_cardinality_limit(size) {
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
let mut b = Buckets::new(self.bounds.len() + 1);
// Ensure min and max are recorded values (not zero), for new buckets.
(b.min, b.max) = (measurement, measurement);
b.bin(idx, measurement);
vacant_entry.insert(b);
} else {
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| val.bin(idx, measurement))
.or_insert_with(|| {
let mut b = Buckets::new(self.bounds.len() + 1);
(b.min, b.max) = (measurement, measurement);
b.bin(idx, measurement);
b
});
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
}
}
}

fn aggregation(&self) -> Option<Box<dyn Aggregation>> {
Expand Down
24 changes: 20 additions & 4 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
time::SystemTime,
};

use crate::attributes::AttributeSet;
use crate::metrics::data::{self, Gauge};
use opentelemetry_api::{global, metrics::MetricsError};

use super::{Aggregator, Number};
use super::{aggregator::STREAM_OVERFLOW_ATTRIBUTE_SET, Aggregator, Number};

/// Timestamped measurement data.
struct DataPointValue<T> {
Expand All @@ -28,11 +29,26 @@ struct LastValue<T> {

impl<T: Number<T>> Aggregator<T> for LastValue<T> {
fn aggregate(&self, measurement: T, attrs: AttributeSet) {
let d = DataPointValue {
let d: DataPointValue<T> = DataPointValue {
timestamp: SystemTime::now(),
value: measurement,
};
let _ = self.values.lock().map(|mut values| values.insert(attrs, d));
if let Ok(mut values) = self.values.lock() {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
occupied_entry.insert(d);
}
Entry::Vacant(vacant_entry) => {
if self.is_under_cardinality_limit(size) {
vacant_entry.insert(d);
} else {
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), d);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
}
}
}
}

fn aggregation(&self) -> Option<Box<dyn crate::metrics::data::Aggregation>> {
Expand Down
62 changes: 48 additions & 14 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
time::SystemTime,
};

use crate::attributes::AttributeSet;
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use opentelemetry_api::{global, metrics::MetricsError};

use super::{aggregator::PrecomputeAggregator, Aggregator, Number};
use super::{
aggregator::{PrecomputeAggregator, STREAM_OVERFLOW_ATTRIBUTE_SET},
Aggregator, Number,
};

/// The storage for sums.
#[derive(Default)]
Expand All @@ -26,10 +30,24 @@ impl<T: Number<T>> ValueMap<T> {
impl<T: Number<T>> Aggregator<T> for ValueMap<T> {
fn aggregate(&self, measurement: T, attrs: AttributeSet) {
if let Ok(mut values) = self.values.lock() {
values
.entry(attrs)
.and_modify(|val| *val += measurement)
.or_insert(measurement);
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let sum = occupied_entry.get_mut();
*sum += measurement;
}
Entry::Vacant(vacant_entry) => {
if self.is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else {
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| *val += measurement)
.or_insert(measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
}
}
}
}

Expand Down Expand Up @@ -211,14 +229,30 @@ impl<T: Number<T>> Aggregator<T> for PrecomputedMap<T> {
Ok(guard) => guard,
Err(_) => return,
};

values
.entry(attrs)
.and_modify(|v| v.measured = measurement)
.or_insert(PrecomputedValue {
measured: measurement,
..Default::default()
});
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let v = occupied_entry.get_mut();
v.measured = measurement;
}
Entry::Vacant(vacant_entry) => {
if self.is_under_cardinality_limit(size) {
vacant_entry.insert(PrecomputedValue {
measured: measurement,
..Default::default()
});
} else {
values.insert(
STREAM_OVERFLOW_ATTRIBUTE_SET.clone(),
PrecomputedValue {
measured: measurement,
..Default::default()
},
);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
}
}
}

fn aggregation(&self) -> Option<Box<dyn Aggregation>> {
Expand Down