Skip to content

Commit 049e01b

Browse files
cratelynolix0r
andauthored
feat(app): Backend frame count metrics (#3308)
* feat(app): Backend response frame count metrics this introduces a new tower middleware for Prometheus metrics, used for instrumenting HTTP and gRPC response bodies, and observing (a) the number of frames yielded by a body, and (b) the number of bytes included in body frames, and (c) the distribution of frame sizes. this middleware allows operators to reason about how large or small the packets being served in a backend's response bodies are. a route-level middleware that instruments request bodies will be added in a follow-on PR. ### 📝 changes an overview of changes made here: * the `linkerd-http-prom` has a new `body_data` submodule. it exposes `request` and `response` halves, to be explicit about which body is being instrumented on a `tower::Service`. * the `linkerd-http-prom` crate now has a collection of new dependencies: `bytes` is added as a dependency in order to inspect the data chunk when the inner body yields a new frame. `futures-util` and `http-body` are added as dev-dependencies for the accompanying test coverage. * body metrics are affixed to the `RouteBackendMetrics<L>` structure, and registered at startup. Signed-off-by: katelyn martin <[email protected]> * review: Inline attribute to service passthrough Signed-off-by: katelyn martin <[email protected]> * review: Inline attribute to body passthrough continuing this theme of inlining, we inline the passthrough methods on `Body` as well. Signed-off-by: katelyn martin <[email protected]> * review: Box `<RecordBodyData as Service>::Future` values Signed-off-by: katelyn martin <[email protected]> * review: rename `get()` to `metrics()` Signed-off-by: katelyn martin <[email protected]> * review: simplify `RecordBodyData<S>` response Signed-off-by: katelyn martin <[email protected]> * Update ResponseBody metrics to use a histogram Signed-off-by: Oliver Gould <[email protected]> * refactor(tests): feature gate frame size histogram assertions see: * prometheus/client_rust#242 * prometheus/client_rust#241 for now, refactor this test so that it gates all use of the (proposed) `sum()` and `count()` accessors behind a temporary feature gate. Signed-off-by: katelyn martin <[email protected]> --------- Signed-off-by: katelyn martin <[email protected]> Signed-off-by: Oliver Gould <[email protected]> Co-authored-by: Oliver Gould <[email protected]>
1 parent 5df46ac commit 049e01b

File tree

12 files changed

+457
-7
lines changed

12 files changed

+457
-7
lines changed

Cargo.lock

+3
Original file line numberDiff line numberDiff line change
@@ -1506,7 +1506,9 @@ dependencies = [
15061506
"ahash",
15071507
"bytes",
15081508
"futures",
1509+
"futures-util",
15091510
"http",
1511+
"http-body",
15101512
"hyper",
15111513
"linkerd-app-core",
15121514
"linkerd-app-test",
@@ -1744,6 +1746,7 @@ dependencies = [
17441746
name = "linkerd-http-prom"
17451747
version = "0.1.0"
17461748
dependencies = [
1749+
"bytes",
17471750
"futures",
17481751
"http",
17491752
"http-body",

linkerd/app/outbound/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
5050
linkerd-tonic-watch = { path = "../../tonic-watch" }
5151

5252
[dev-dependencies]
53+
futures-util = "0.3"
54+
http-body = "0.4"
5355
hyper = { version = "0.14", features = ["http1", "http2"] }
5456
tokio = { version = "1", features = ["macros", "sync", "time"] }
5557
tokio-rustls = "0.24"

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs

+51-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{BackendRef, ParentRef, RouteRef};
22
use linkerd_app_core::{metrics::prom, svc};
33
use linkerd_http_prom::{
4+
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies},
45
record_response::{self, NewResponseDuration, StreamLabel},
56
NewCountRequests, RequestCount, RequestCountFamilies,
67
};
@@ -15,6 +16,7 @@ mod tests;
1516
pub struct RouteBackendMetrics<L: StreamLabel> {
1617
requests: RequestCountFamilies<labels::RouteBackend>,
1718
responses: ResponseMetrics<L>,
19+
body_metrics: ResponseBodyFamilies<labels::RouteBackend>,
1820
}
1921

2022
type ResponseMetrics<L> = record_response::ResponseMetrics<
@@ -26,14 +28,24 @@ pub fn layer<T, N>(
2628
metrics: &RouteBackendMetrics<T::StreamLabel>,
2729
) -> impl svc::Layer<
2830
N,
29-
Service = NewCountRequests<
30-
ExtractRequestCount,
31-
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
31+
Service = NewRecordBodyData<
32+
ExtractRecordBodyDataParams,
33+
NewCountRequests<
34+
ExtractRequestCount,
35+
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
36+
>,
3237
>,
3338
> + Clone
3439
where
3540
T: MkStreamLabel,
3641
N: svc::NewService<T>,
42+
NewRecordBodyData<
43+
ExtractRecordBodyDataParams,
44+
NewCountRequests<
45+
ExtractRequestCount,
46+
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
47+
>,
48+
>: svc::NewService<T>,
3749
NewCountRequests<
3850
ExtractRequestCount,
3951
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
@@ -44,28 +56,37 @@ where
4456
let RouteBackendMetrics {
4557
requests,
4658
responses,
59+
body_metrics,
4760
} = metrics.clone();
61+
4862
svc::layer::mk(move |inner| {
4963
use svc::Layer;
50-
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
51-
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
52-
.layer(inner),
64+
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
65+
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
66+
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
67+
.layer(inner),
68+
),
5369
)
5470
})
5571
}
5672

5773
#[derive(Clone, Debug)]
5874
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);
5975

76+
#[derive(Clone, Debug)]
77+
pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies<labels::RouteBackend>);
78+
6079
// === impl RouteBackendMetrics ===
6180

6281
impl<L: StreamLabel> RouteBackendMetrics<L> {
6382
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
6483
let requests = RequestCountFamilies::register(reg);
6584
let responses = record_response::ResponseMetrics::register(reg, histo);
85+
let body_metrics = ResponseBodyFamilies::register(reg);
6686
Self {
6787
requests,
6888
responses,
89+
body_metrics,
6990
}
7091
}
7192

@@ -83,13 +104,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
83104
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
84105
self.responses.get_statuses(l)
85106
}
107+
108+
#[cfg(test)]
109+
pub(crate) fn get_response_body_metrics(
110+
&self,
111+
l: &labels::RouteBackend,
112+
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
113+
self.body_metrics.metrics(l)
114+
}
86115
}
87116

88117
impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
89118
fn default() -> Self {
90119
Self {
91120
requests: Default::default(),
92121
responses: Default::default(),
122+
body_metrics: Default::default(),
93123
}
94124
}
95125
}
@@ -99,6 +129,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
99129
Self {
100130
requests: self.requests.clone(),
101131
responses: self.responses.clone(),
132+
body_metrics: self.body_metrics.clone(),
102133
}
103134
}
104135
}
@@ -114,3 +145,17 @@ where
114145
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
115146
}
116147
}
148+
149+
// === impl ExtractRecordBodyDataParams ===
150+
151+
impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
152+
where
153+
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
154+
{
155+
fn extract_param(&self, t: &T) -> BodyDataMetrics {
156+
let Self(families) = self;
157+
let labels = labels::RouteBackend(t.param(), t.param(), t.param());
158+
159+
families.metrics(&labels)
160+
}
161+
}

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs

+141
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use super::{
55
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
66
};
77
use crate::http::{concrete, logical::Concrete};
8+
use bytes::Buf;
89
use linkerd_app_core::{
910
svc::{self, http::BoxBody, Layer, NewService},
1011
transport::{Remote, ServerAddr},
12+
Error,
1113
};
1214
use linkerd_proxy_client_policy as policy;
1315

@@ -114,6 +116,145 @@ async fn http_request_statuses() {
114116
assert_eq!(mixed.get(), 1);
115117
}
116118

119+
/// Tests that metrics count frames in the backend response body.
120+
#[tokio::test(flavor = "current_thread", start_paused = true)]
121+
async fn body_data_layer_records_frames() -> Result<(), Error> {
122+
use http_body::Body;
123+
use linkerd_app_core::proxy::http;
124+
use linkerd_http_prom::body_data::response::BodyDataMetrics;
125+
use tower::{Service, ServiceExt};
126+
127+
let _trace = linkerd_tracing::test::trace_init();
128+
129+
let metrics = super::RouteBackendMetrics::default();
130+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
131+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
132+
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));
133+
134+
let (mut svc, mut handle) =
135+
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
136+
handle.allow(1);
137+
138+
// Create a request.
139+
let req = {
140+
let empty = hyper::Body::empty();
141+
let body = BoxBody::new(empty);
142+
http::Request::builder().method("DOOT").body(body).unwrap()
143+
};
144+
145+
// Call the service once it is ready to accept a request.
146+
tracing::info!("calling service");
147+
svc.ready().await.expect("ready");
148+
let call = svc.call(req);
149+
let (req, send_resp) = handle.next_request().await.unwrap();
150+
debug_assert_eq!(req.method().as_str(), "DOOT");
151+
152+
// Acquire the counters for this backend.
153+
tracing::info!("acquiring response body metrics");
154+
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
155+
let BodyDataMetrics {
156+
// TODO(kate): currently, histograms do not expose their observation count or sum. so,
157+
// we're left unable to exercise these metrics until prometheus/client_rust#242 lands.
158+
// - https://github.com/prometheus/client_rust/pull/241
159+
// - https://github.com/prometheus/client_rust/pull/242
160+
#[cfg(feature = "prometheus-client-rust-242")]
161+
frame_size,
162+
..
163+
} = metrics.get_response_body_metrics(&labels);
164+
165+
// Before we've sent a response, the counter should be zero.
166+
#[cfg(feature = "prometheus-client-rust-242")]
167+
{
168+
assert_eq!(frame_size.count(), 0);
169+
assert_eq!(frame_size.sum(), 0);
170+
}
171+
172+
// Create a response whose body is backed by a channel that we can send chunks to, send it.
173+
tracing::info!("sending response");
174+
let mut resp_tx = {
175+
let (tx, body) = hyper::Body::channel();
176+
let body = BoxBody::new(body);
177+
let resp = http::Response::builder()
178+
.status(http::StatusCode::IM_A_TEAPOT)
179+
.body(body)
180+
.unwrap();
181+
send_resp.send_response(resp);
182+
tx
183+
};
184+
185+
// Before we've sent any bytes, the counter should be zero.
186+
#[cfg(feature = "prometheus-client-rust-242")]
187+
{
188+
assert_eq!(frame_size.count(), 0);
189+
assert_eq!(frame_size.sum(), 0);
190+
}
191+
192+
// On the client end, poll our call future and await the response.
193+
tracing::info!("polling service future");
194+
let (parts, body) = call.await?.into_parts();
195+
debug_assert_eq!(parts.status, 418);
196+
197+
let mut body = Box::pin(body);
198+
199+
/// Returns the next chunk from a boxed body.
200+
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
201+
use std::task::{Context, Poll};
202+
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
203+
let data = match body.as_mut().poll_data(&mut ctx) {
204+
Poll::Ready(Some(Ok(d))) => d,
205+
_ => panic!("next chunk should be ready"),
206+
};
207+
let chunk = data.chunk().to_vec();
208+
Ok(chunk)
209+
}
210+
211+
{
212+
// Send a chunk, confirm that our counters are incremented.
213+
tracing::info!("sending first chunk");
214+
resp_tx.send_data("hello".into()).await?;
215+
let chunk = read_chunk(&mut body).await?;
216+
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
217+
#[cfg(feature = "prometheus-client-rust-242")]
218+
assert_eq!(frame_size.count(), 1);
219+
#[cfg(feature = "prometheus-client-rust-242")]
220+
assert_eq!(frame_size.sum(), 5);
221+
}
222+
223+
{
224+
// Send another chunk, confirm that our counters are incremented once more.
225+
tracing::info!("sending second chunk");
226+
resp_tx.send_data(", world!".into()).await?;
227+
let chunk = read_chunk(&mut body).await?;
228+
debug_assert_eq!(
229+
", world!".as_bytes(),
230+
chunk,
231+
"should get same value back out"
232+
);
233+
#[cfg(feature = "prometheus-client-rust-242")]
234+
assert_eq!(frame_size.count(), 2);
235+
#[cfg(feature = "prometheus-client-rust-242")]
236+
assert_eq!(frame_size.sum(), 5 + 8);
237+
}
238+
239+
{
240+
// Close the body, show that the counters remain at the same values.
241+
use std::task::{Context, Poll};
242+
tracing::info!("closing response body");
243+
drop(resp_tx);
244+
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
245+
match body.as_mut().poll_data(&mut ctx) {
246+
Poll::Ready(None) => {}
247+
_ => panic!("got unexpected poll result"),
248+
};
249+
#[cfg(feature = "prometheus-client-rust-242")]
250+
assert_eq!(frame_size.count(), 2);
251+
#[cfg(feature = "prometheus-client-rust-242")]
252+
assert_eq!(frame_size.sum(), 5 + 8);
253+
}
254+
255+
Ok(())
256+
}
257+
117258
#[tokio::test(flavor = "current_thread", start_paused = true)]
118259
async fn grpc_request_statuses_ok() {
119260
let _trace = linkerd_tracing::test::trace_init();

linkerd/http/prom/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics.
1313
test-util = []
1414

1515
[dependencies]
16+
bytes = "1"
1617
futures = { version = "0.3", default-features = false }
1718
http = "0.2"
1819
http-body = "0.4"

linkerd/http/prom/src/body_data.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod request;
2+
pub mod response;
3+
4+
mod body;
5+
mod metrics;

0 commit comments

Comments
 (0)