Skip to content

Commit c322a50

Browse files
authored
Histogram implementation cleanup (#2283)
1 parent 540c6f1 commit c322a50

File tree

1 file changed

+46
-125
lines changed

1 file changed

+46
-125
lines changed

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 46 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@ use opentelemetry::KeyValue;
99
use super::ValueMap;
1010
use super::{Aggregator, Number};
1111

12-
struct HistogramTracker<T> {
13-
buckets: Mutex<Buckets<T>>,
14-
}
15-
16-
impl<T> Aggregator for HistogramTracker<T>
12+
impl<T> Aggregator for Mutex<Buckets<T>>
1713
where
1814
T: Number,
1915
{
@@ -22,27 +18,26 @@ where
2218
type PreComputedValue = (T, usize);
2319

2420
fn update(&self, (value, index): (T, usize)) {
25-
let mut buckets = match self.buckets.lock() {
26-
Ok(guard) => guard,
27-
Err(_) => return,
28-
};
21+
let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());
2922

30-
buckets.bin(index, value);
31-
buckets.sum(value);
23+
buckets.total += value;
24+
buckets.count += 1;
25+
buckets.counts[index] += 1;
26+
if value < buckets.min {
27+
buckets.min = value;
28+
}
29+
if value > buckets.max {
30+
buckets.max = value
31+
}
3232
}
3333

3434
fn create(count: &usize) -> Self {
35-
HistogramTracker {
36-
buckets: Mutex::new(Buckets::<T>::new(*count)),
37-
}
35+
Mutex::new(Buckets::<T>::new(*count))
3836
}
3937

4038
fn clone_and_reset(&self, count: &usize) -> Self {
41-
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
42-
let cloned = replace(current.deref_mut(), Buckets::new(*count));
43-
Self {
44-
buckets: Mutex::new(cloned),
45-
}
39+
let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
40+
Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
4641
}
4742
}
4843

@@ -65,62 +60,34 @@ impl<T: Number> Buckets<T> {
6560
..Default::default()
6661
}
6762
}
68-
69-
fn sum(&mut self, value: T) {
70-
self.total += value;
71-
}
72-
73-
fn bin(&mut self, idx: usize, value: T) {
74-
self.counts[idx] += 1;
75-
self.count += 1;
76-
if value < self.min {
77-
self.min = value;
78-
}
79-
if value > self.max {
80-
self.max = value
81-
}
82-
}
8363
}
8464

8565
/// Summarizes a set of measurements as a histogram with explicitly defined
8666
/// buckets.
8767
pub(crate) struct Histogram<T: Number> {
88-
value_map: ValueMap<HistogramTracker<T>>,
68+
value_map: ValueMap<Mutex<Buckets<T>>>,
8969
bounds: Vec<f64>,
9070
record_min_max: bool,
9171
record_sum: bool,
9272
start: Mutex<SystemTime>,
9373
}
9474

9575
impl<T: Number> Histogram<T> {
96-
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
97-
// TODO fix the bug, by first removing NaN and only then getting buckets_count
98-
// once we know the reason for performance degradation
99-
let buckets_count = boundaries.len() + 1;
100-
let mut histogram = Histogram {
76+
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
77+
bounds.retain(|v| !v.is_nan());
78+
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
79+
let buckets_count = bounds.len() + 1;
80+
Histogram {
10181
value_map: ValueMap::new(buckets_count),
102-
bounds: boundaries,
82+
bounds,
10383
record_min_max,
10484
record_sum,
10585
start: Mutex::new(SystemTime::now()),
106-
};
107-
108-
histogram.bounds.retain(|v| !v.is_nan());
109-
histogram
110-
.bounds
111-
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
112-
113-
histogram
86+
}
11487
}
11588

11689
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
11790
let f = measurement.into_float();
118-
// Ignore NaN and infinity.
119-
// Only makes sense if T is f64, maybe this could be no-op for other cases?
120-
// TODO: uncomment once we know the reason for performance degradation
121-
// if f.is_infinite() || f.is_nan() {
122-
// return;
123-
// }
12491
// This search will return an index in the range `[0, bounds.len()]`, where
12592
// it will return `bounds.len()` if value is greater than the last element
12693
// of `bounds`. This aligns with the buckets in that the length of buckets
@@ -156,17 +123,14 @@ impl<T: Number> Histogram<T> {
156123

157124
self.value_map
158125
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
159-
let b = aggr
160-
.buckets
161-
.into_inner()
162-
.unwrap_or_else(|err| err.into_inner());
126+
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
163127
HistogramDataPoint {
164128
attributes,
165129
start_time: prev_start,
166130
time: t,
167131
count: b.count,
168132
bounds: self.bounds.clone(),
169-
bucket_counts: b.counts.clone(),
133+
bucket_counts: b.counts,
170134
sum: if self.record_sum {
171135
b.total
172136
} else {
@@ -214,7 +178,7 @@ impl<T: Number> Histogram<T> {
214178

215179
self.value_map
216180
.collect_readonly(&mut h.data_points, |attributes, aggr| {
217-
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
181+
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
218182
HistogramDataPoint {
219183
attributes,
220184
start_time: prev_start,
@@ -245,68 +209,25 @@ impl<T: Number> Histogram<T> {
245209
}
246210
}
247211

248-
// TODO: uncomment once we know the reason for performance degradation
249-
// #[cfg(test)]
250-
// mod tests {
212+
#[cfg(test)]
213+
mod tests {
214+
use super::*;
251215

252-
// use super::*;
253-
254-
// #[test]
255-
// fn when_f64_is_nan_or_infinity_then_ignore() {
256-
// struct Expected {
257-
// min: f64,
258-
// max: f64,
259-
// sum: f64,
260-
// count: u64,
261-
// }
262-
// impl Expected {
263-
// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self {
264-
// Expected {
265-
// min,
266-
// max,
267-
// sum,
268-
// count,
269-
// }
270-
// }
271-
// }
272-
// struct TestCase {
273-
// values: Vec<f64>,
274-
// expected: Expected,
275-
// }
276-
277-
// let test_cases = vec![
278-
// TestCase {
279-
// values: vec![2.0, 4.0, 1.0],
280-
// expected: Expected::new(1.0, 4.0, 7.0, 3),
281-
// },
282-
// TestCase {
283-
// values: vec![2.0, 4.0, 1.0, f64::INFINITY],
284-
// expected: Expected::new(1.0, 4.0, 7.0, 3),
285-
// },
286-
// TestCase {
287-
// values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
288-
// expected: Expected::new(1.0, 4.0, 7.0, 3),
289-
// },
290-
// TestCase {
291-
// values: vec![2.0, f64::NAN, 4.0, 1.0],
292-
// expected: Expected::new(1.0, 4.0, 7.0, 3),
293-
// },
294-
// TestCase {
295-
// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
296-
// expected: Expected::new(1.0, 16.0, 31.0, 6),
297-
// },
298-
// ];
299-
300-
// for test in test_cases {
301-
// let h = Histogram::new(vec![], true, true);
302-
// for v in test.values {
303-
// h.measure(v, &[]);
304-
// }
305-
// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap();
306-
// assert_eq!(test.expected.max, res.max);
307-
// assert_eq!(test.expected.min, res.min);
308-
// assert_eq!(test.expected.sum, res.total);
309-
// assert_eq!(test.expected.count, res.count);
310-
// }
311-
// }
312-
// }
216+
#[test]
217+
fn check_buckets_are_selected_correctly() {
218+
let hist = Histogram::<i64>::new(vec![1.0, 3.0, 6.0], false, false);
219+
for v in 1..11 {
220+
hist.measure(v, &[]);
221+
}
222+
let (count, dp) = hist.cumulative(None);
223+
let dp = dp.unwrap();
224+
let dp = dp.as_any().downcast_ref::<data::Histogram<i64>>().unwrap();
225+
assert_eq!(count, 1);
226+
assert_eq!(dp.data_points[0].count, 10);
227+
assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
228+
assert_eq!(dp.data_points[0].bucket_counts[0], 1); // 1
229+
assert_eq!(dp.data_points[0].bucket_counts[1], 2); // 2, 3
230+
assert_eq!(dp.data_points[0].bucket_counts[2], 3); // 4, 5, 6
231+
assert_eq!(dp.data_points[0].bucket_counts[3], 4); // 7, 8, 9, 10
232+
}
233+
}

0 commit comments

Comments
 (0)