Skip to content

Commit 09ebfdc

Browse files
committed
Generic metrics collector
1 parent 6cc327d commit 09ebfdc

File tree

5 files changed

+357
-187
lines changed

5 files changed

+357
-187
lines changed

opentelemetry-sdk/src/metrics/data/mod.rs

+16
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
5353
fn as_mut(&mut self) -> &mut dyn any::Any;
5454
}
5555

56+
/// Allow to access data points of an [Aggregation].
57+
pub trait AggregationDataPoints {
58+
/// The type of data point in the aggregation.
59+
type Point;
60+
/// The data points of the aggregation.
61+
fn points(&mut self) -> &mut Vec<Self::Point>;
62+
}
63+
5664
/// DataPoint is a single data point in a time series.
5765
#[derive(Debug, PartialEq)]
5866
pub struct GaugeDataPoint<T> {
@@ -228,6 +236,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
228236
}
229237
}
230238

239+
impl<T> AggregationDataPoints for ExponentialHistogram<T> {
240+
type Point = ExponentialHistogramDataPoint<T>;
241+
242+
fn points(&mut self) -> &mut Vec<Self::Point> {
243+
&mut self.data_points
244+
}
245+
}
246+
231247
/// A single exponential histogram data point in a time series.
232248
#[derive(Debug, PartialEq)]
233249
pub struct ExponentialHistogramDataPoint<T> {

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+44-12
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,19 @@ use std::{
88

99
use opentelemetry::KeyValue;
1010

11-
use crate::metrics::{data::Aggregation, Temporality};
11+
use crate::metrics::{
12+
data::{Aggregation, AggregationDataPoints},
13+
Temporality,
14+
};
1215

1316
use super::{
14-
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
15-
precomputed_sum::PrecomputedSum, sum::Sum, Number,
17+
collector::{Collector, CumulativeValueMap, DeltaValueMap},
18+
exponential_histogram::{ExpoHistogram, ExpoHistogramBucketConfig},
19+
histogram::Histogram,
20+
last_value::LastValue,
21+
precomputed_sum::PrecomputedSum,
22+
sum::Sum,
23+
Number,
1624
};
1725

1826
pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
@@ -58,6 +66,7 @@ where
5866
}
5967
}
6068

69+
#[derive(Clone, Copy)]
6170
pub(crate) struct AggregateTime {
6271
pub start: SystemTime,
6372
pub current: SystemTime,
@@ -121,6 +130,12 @@ impl AttributeSetFilter {
121130
}
122131
}
123132

133+
pub(crate) trait InitAggregationData {
134+
type Aggr: Aggregation + AggregationDataPoints;
135+
fn create_new(&self, time: AggregateTime) -> Self::Aggr;
136+
fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime);
137+
}
138+
124139
/// Builds aggregate functions
125140
pub(crate) struct AggregateBuilder<T> {
126141
/// The temporality used for the returned aggregate functions.
@@ -182,15 +197,32 @@ impl<T: Number> AggregateBuilder<T> {
182197
record_min_max: bool,
183198
record_sum: bool,
184199
) -> AggregateFns<T> {
185-
ExpoHistogram::new(
186-
self.temporality,
187-
self.filter.clone(),
188-
max_size,
189-
max_scale,
190-
record_min_max,
191-
record_sum,
192-
)
193-
.into()
200+
match self.temporality {
201+
Temporality::Delta => ExpoHistogram {
202+
aggregate_collector: Collector::new(
203+
self.filter.clone(),
204+
DeltaValueMap::new(ExpoHistogramBucketConfig {
205+
max_size: max_size as i32,
206+
max_scale,
207+
}),
208+
),
209+
record_min_max,
210+
record_sum,
211+
}
212+
.into(),
213+
_ => ExpoHistogram {
214+
aggregate_collector: Collector::new(
215+
self.filter.clone(),
216+
CumulativeValueMap::new(ExpoHistogramBucketConfig {
217+
max_size: max_size as i32,
218+
max_scale,
219+
}),
220+
),
221+
record_min_max,
222+
record_sum,
223+
}
224+
.into(),
225+
}
194226
}
195227
}
196228

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
use opentelemetry::KeyValue;
2+
3+
use crate::metrics::{
4+
data::{Aggregation, AggregationDataPoints},
5+
Temporality,
6+
};
7+
8+
use super::{
9+
aggregate::{AggregateTime, AttributeSetFilter},
10+
AggregateTimeInitiator, Aggregator, InitAggregationData, ValueMap,
11+
};
12+
13+
/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
14+
pub(crate) trait AggregateMap: Send + Sync + 'static {
15+
const TEMPORALITY: Temporality;
16+
type Aggr: Aggregator;
17+
18+
fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);
19+
20+
fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
21+
where
22+
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
23+
}
24+
25+
/// Higher level abstraction (compared to [`AggregateMap`]) that also does the filtering and collection into aggregation data
26+
pub(crate) trait AggregateCollector: Send + Sync + 'static {
27+
const TEMPORALITY: Temporality;
28+
type Aggr: Aggregator;
29+
30+
fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);
31+
32+
fn collect<InitAggregate, F>(
33+
&self,
34+
aggregate: &InitAggregate,
35+
dest: Option<&mut dyn Aggregation>,
36+
create_point: F,
37+
) -> (usize, Option<Box<dyn Aggregation>>)
38+
where
39+
InitAggregate: InitAggregationData,
40+
F: FnMut(
41+
Vec<KeyValue>,
42+
&Self::Aggr,
43+
) -> <InitAggregate::Aggr as AggregationDataPoints>::Point;
44+
}
45+
46+
pub(crate) struct Collector<AM> {
47+
filter: AttributeSetFilter,
48+
aggregate_map: AM,
49+
time: AggregateTimeInitiator,
50+
}
51+
52+
impl<AM> Collector<AM>
53+
where
54+
AM: AggregateMap,
55+
{
56+
pub(crate) fn new(filter: AttributeSetFilter, aggregate_map: AM) -> Self {
57+
Self {
58+
filter,
59+
aggregate_map,
60+
time: AggregateTimeInitiator::default(),
61+
}
62+
}
63+
64+
fn init_time(&self) -> AggregateTime {
65+
if let Temporality::Delta = AM::TEMPORALITY {
66+
self.time.delta()
67+
} else {
68+
self.time.cumulative()
69+
}
70+
}
71+
}
72+
73+
impl<AM> AggregateCollector for Collector<AM>
74+
where
75+
AM: AggregateMap,
76+
{
77+
const TEMPORALITY: Temporality = AM::TEMPORALITY;
78+
79+
type Aggr = AM::Aggr;
80+
81+
fn measure(&self, value: <AM::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]) {
82+
self.filter.apply(attributes, |filtered_attrs| {
83+
self.aggregate_map.measure(value, filtered_attrs);
84+
});
85+
}
86+
87+
fn collect<InitAggregate, F>(
88+
&self,
89+
aggregate: &InitAggregate,
90+
dest: Option<&mut dyn Aggregation>,
91+
create_point: F,
92+
) -> (usize, Option<Box<dyn Aggregation>>)
93+
where
94+
InitAggregate: InitAggregationData,
95+
F: FnMut(Vec<KeyValue>, &AM::Aggr) -> <InitAggregate::Aggr as AggregationDataPoints>::Point,
96+
{
97+
let time = self.init_time();
98+
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<InitAggregate::Aggr>());
99+
let mut new_agg = if s_data.is_none() {
100+
Some(aggregate.create_new(time))
101+
} else {
102+
None
103+
};
104+
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
105+
aggregate.reset_existing(s_data, time);
106+
self.aggregate_map
107+
.collect_data_points(s_data.points(), create_point);
108+
109+
(
110+
s_data.points().len(),
111+
new_agg.map(|a| Box::new(a) as Box<_>),
112+
)
113+
}
114+
}
115+
116+
/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
117+
/// Later this could be improved to support only Delta temporality
118+
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
119+
where
120+
A: Aggregator;
121+
122+
impl<A> DeltaValueMap<A>
123+
where
124+
A: Aggregator,
125+
{
126+
pub(crate) fn new(config: A::InitConfig) -> Self {
127+
Self(ValueMap::new(config))
128+
}
129+
}
130+
131+
impl<A> AggregateMap for DeltaValueMap<A>
132+
where
133+
A: Aggregator,
134+
<A as Aggregator>::InitConfig: Send + Sync,
135+
{
136+
const TEMPORALITY: Temporality = Temporality::Delta;
137+
138+
type Aggr = A;
139+
140+
fn measure(
141+
&self,
142+
value: <Self::Aggr as Aggregator>::PreComputedValue,
143+
attributes: &[KeyValue],
144+
) {
145+
self.0.measure(value, attributes);
146+
}
147+
148+
fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
149+
where
150+
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
151+
{
152+
self.0
153+
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
154+
}
155+
}
156+
157+
/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
158+
/// Later this could be improved to support only Cumulative temporality
159+
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
160+
where
161+
A: Aggregator;
162+
163+
impl<A> CumulativeValueMap<A>
164+
where
165+
A: Aggregator,
166+
{
167+
pub(crate) fn new(config: A::InitConfig) -> Self {
168+
Self(ValueMap::new(config))
169+
}
170+
}
171+
172+
impl<A> AggregateMap for CumulativeValueMap<A>
173+
where
174+
A: Aggregator,
175+
<A as Aggregator>::InitConfig: Send + Sync,
176+
{
177+
const TEMPORALITY: Temporality = Temporality::Cumulative;
178+
179+
type Aggr = A;
180+
181+
fn measure(
182+
&self,
183+
value: <Self::Aggr as Aggregator>::PreComputedValue,
184+
attributes: &[KeyValue],
185+
) {
186+
self.0.measure(value, attributes);
187+
}
188+
189+
fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
190+
where
191+
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
192+
{
193+
self.0.collect_readonly(dest, map_fn);
194+
}
195+
}

0 commit comments

Comments
 (0)