Skip to content

Commit 166d217

Browse files
committed
Implement cardinality limits for metrics streams open-telemetry#1065
Signed-off-by: Benjamin Coenen <[email protected]>
1 parent da368d4 commit 166d217

File tree

3 files changed

+73
-41
lines changed

3 files changed

+73
-41
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,35 @@
1-
use std::{marker, sync::Arc};
1+
use std::marker;
2+
use std::sync::atomic::AtomicU64;
3+
use std::sync::Arc;
24

35
use once_cell::sync::Lazy;
46
use opentelemetry::KeyValue;
57

6-
use crate::{
7-
metrics::data::{Aggregation, Gauge, Temporality},
8-
metrics::AttributeSet,
9-
};
10-
11-
use super::{
12-
exponential_histogram::ExpoHistogram,
13-
histogram::Histogram,
14-
last_value::LastValue,
15-
sum::{PrecomputedSum, Sum},
16-
Number,
17-
};
18-
19-
const STREAM_CARDINALITY_LIMIT: u32 = 2000;
8+
use super::exponential_histogram::ExpoHistogram;
9+
use super::histogram::Histogram;
10+
use super::last_value::LastValue;
11+
use super::sum::PrecomputedSum;
12+
use super::sum::Sum;
13+
use super::Number;
14+
use crate::metrics::data::Aggregation;
15+
use crate::metrics::data::Gauge;
16+
use crate::metrics::data::Temporality;
17+
use crate::metrics::AttributeSet;
18+
19+
static STREAM_CARDINALITY_LIMIT: AtomicU64 = AtomicU64::new(2000);
2020
pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(|| {
2121
let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")];
2222
AttributeSet::from(&key_values[..])
2323
});
2424

2525
/// Checks whether aggregator has hit cardinality limit for metric streams
2626
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
27-
size < STREAM_CARDINALITY_LIMIT as usize
27+
size < STREAM_CARDINALITY_LIMIT.load(std::sync::atomic::Ordering::Relaxed) as usize
28+
}
29+
30+
/// Set cardinality limit for metric streams
31+
pub fn set_stream_cardinality_limit(size: u64) {
32+
STREAM_CARDINALITY_LIMIT.store(size, std::sync::atomic::Ordering::Relaxed)
2833
}
2934

3035
/// Receives measurements to be aggregated.
@@ -213,13 +218,17 @@ impl<T: Number<T>> AggregateBuilder<T> {
213218

214219
#[cfg(test)]
215220
mod tests {
216-
use crate::metrics::data::{
217-
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
218-
Histogram, HistogramDataPoint, Sum,
219-
};
220-
use std::{time::SystemTime, vec};
221+
use std::time::SystemTime;
222+
use std::vec;
221223

222224
use super::*;
225+
use crate::metrics::data::DataPoint;
226+
use crate::metrics::data::ExponentialBucket;
227+
use crate::metrics::data::ExponentialHistogram;
228+
use crate::metrics::data::ExponentialHistogramDataPoint;
229+
use crate::metrics::data::Histogram;
230+
use crate::metrics::data::HistogramDataPoint;
231+
use crate::metrics::data::Sum;
223232

224233
#[test]
225234
fn last_value_aggregation() {

opentelemetry-sdk/src/metrics/internal/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,20 @@ mod last_value;
55
mod sum;
66

77
use core::fmt;
8-
use std::ops::{Add, AddAssign, Sub};
9-
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
8+
use std::ops::Add;
9+
use std::ops::AddAssign;
10+
use std::ops::Sub;
11+
use std::sync::atomic::AtomicI64;
12+
use std::sync::atomic::AtomicU64;
13+
use std::sync::atomic::Ordering;
1014
use std::sync::Mutex;
1115

12-
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
13-
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
16+
pub use aggregate::set_stream_cardinality_limit;
17+
pub(crate) use aggregate::AggregateBuilder;
18+
pub(crate) use aggregate::ComputeAggregation;
19+
pub(crate) use aggregate::Measure;
20+
pub(crate) use exponential_histogram::EXPO_MAX_SCALE;
21+
pub(crate) use exponential_histogram::EXPO_MIN_SCALE;
1422

1523
/// Marks a type that can have a value added and retrieved atomically. Required since
1624
/// different types have different backing atomic mechanisms

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,24 @@ pub(crate) mod pipeline;
5252
pub mod reader;
5353
pub(crate) mod view;
5454

55+
use std::collections::hash_map::DefaultHasher;
56+
use std::collections::HashSet;
57+
use std::hash::Hash;
58+
use std::hash::Hasher;
59+
5560
pub use aggregation::*;
5661
pub use instrument::*;
62+
pub use internal::set_stream_cardinality_limit;
5763
pub use manual_reader::*;
5864
pub use meter::*;
5965
pub use meter_provider::*;
66+
use opentelemetry::Key;
67+
use opentelemetry::KeyValue;
68+
use opentelemetry::Value;
6069
pub use periodic_reader::*;
6170
pub use pipeline::Pipeline;
6271
pub use view::*;
6372

64-
use std::collections::hash_map::DefaultHasher;
65-
use std::collections::HashSet;
66-
use std::hash::{Hash, Hasher};
67-
68-
use opentelemetry::{Key, KeyValue, Value};
69-
7073
/// A unique set of attributes that can be used as instrument identifiers.
7174
///
7275
/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as
@@ -139,20 +142,32 @@ impl Hash for AttributeSet {
139142

140143
#[cfg(all(test, feature = "testing"))]
141144
mod tests {
142-
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
143-
use super::*;
144-
use crate::metrics::data::{ResourceMetrics, Temporality};
145-
use crate::metrics::reader::TemporalitySelector;
146-
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
147-
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
148-
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
149-
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
150-
use rand::{rngs, Rng, SeedableRng};
151145
use std::borrow::Cow;
152-
use std::sync::{Arc, Mutex};
146+
use std::sync::Arc;
147+
use std::sync::Mutex;
153148
use std::thread;
154149
use std::time::Duration;
155150

151+
use opentelemetry::metrics::Counter;
152+
use opentelemetry::metrics::Meter;
153+
use opentelemetry::metrics::MeterProvider as _;
154+
use opentelemetry::metrics::UpDownCounter;
155+
use opentelemetry::KeyValue;
156+
use rand::rngs;
157+
use rand::Rng;
158+
use rand::SeedableRng;
159+
160+
use self::data::DataPoint;
161+
use self::data::HistogramDataPoint;
162+
use self::data::ScopeMetrics;
163+
use super::*;
164+
use crate::metrics::data::ResourceMetrics;
165+
use crate::metrics::data::Temporality;
166+
use crate::metrics::reader::TemporalitySelector;
167+
use crate::runtime;
168+
use crate::testing::metrics::InMemoryMetricsExporter;
169+
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
170+
156171
// Run all tests in this mod
157172
// cargo test metrics::tests --features=testing
158173
// Note for all tests from this point onwards in this mod:

0 commit comments

Comments
 (0)