Skip to content

Commit b2f3c8b

Browse files
authored
Unifty httpd in blackholes (#1187)
### What does this PR do? This commit unifies the httpd bits of the blackholes, removing a duplicated loop in three places. The type logic here started out general and got gradually more concrete and there's scope to reduce duplication even further but I don't know that it's a pressing issue.
1 parent c7aa607 commit b2f3c8b

File tree

5 files changed

+303
-236
lines changed

5 files changed

+303
-236
lines changed

lading/src/blackhole.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
88
use serde::{Deserialize, Serialize};
99

10+
mod common;
1011
pub mod http;
1112
pub mod splunk_hec;
1213
pub mod sqs;

lading/src/blackhole/common.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use bytes::Bytes;
2+
use http_body_util::combinators::BoxBody;
3+
use hyper::service::Service;
4+
use hyper_util::{
5+
rt::{TokioExecutor, TokioIo},
6+
server::conn::auto,
7+
};
8+
use lading_signal::Watcher;
9+
use std::{net::SocketAddr, sync::Arc};
10+
use tokio::{
11+
net::TcpListener,
12+
pin,
13+
sync::{Semaphore, TryAcquireError},
14+
task::JoinSet,
15+
};
16+
use tracing::{debug, error, info, warn};
17+
18+
#[derive(thiserror::Error, Debug)]
19+
pub enum Error {
20+
/// Wrapper for [`std::io::Error`].
21+
#[error("IO error: {0}")]
22+
Io(std::io::Error),
23+
}
24+
25+
pub(crate) async fn run_httpd<SF, S>(
26+
addr: SocketAddr,
27+
concurrency_limit: usize,
28+
shutdown: Watcher,
29+
make_service: SF,
30+
) -> Result<(), Error>
31+
where
32+
// "service factory"
33+
SF: Send + Sync + 'static + Clone + Fn() -> S,
34+
// The bounds on `S` per
35+
// https://docs.rs/hyper/latest/hyper/service/trait.Service.html and then
36+
// made concrete per
37+
// https://docs.rs/hyper-util/latest/hyper_util/server/conn/auto/struct.Builder.html#method.serve_connection
38+
S: Service<
39+
hyper::Request<hyper::body::Incoming>,
40+
Response = hyper::Response<BoxBody<Bytes, hyper::Error>>,
41+
Error = hyper::Error,
42+
> + Send
43+
+ 'static,
44+
45+
S::Future: Send + 'static,
46+
{
47+
let listener = TcpListener::bind(addr).await.map_err(Error::Io)?;
48+
let sem = Arc::new(Semaphore::new(concurrency_limit));
49+
let mut join_set = JoinSet::new();
50+
51+
let shutdown_fut = shutdown.recv();
52+
pin!(shutdown_fut);
53+
loop {
54+
tokio::select! {
55+
() = &mut shutdown_fut => {
56+
info!("Shutdown signal received, stopping accept loop.");
57+
break;
58+
}
59+
60+
incoming = listener.accept() => {
61+
let (stream, addr) = match incoming {
62+
Ok(sa) => sa,
63+
Err(e) => {
64+
error!("Error accepting connection: {e}");
65+
continue;
66+
}
67+
};
68+
debug!("Accepted connection from {addr}");
69+
70+
let sem = Arc::clone(&sem);
71+
let service_factory = make_service.clone();
72+
73+
join_set.spawn(async move {
74+
// NOTE we are paying the cost for allocating a socket et al
75+
// here and then immediately dropping the connection. If we
76+
// wanted to be more resource spare we should not accept the
77+
// connection before the semaphore is known to have capacity.
78+
//
79+
// Doesn't matter really for lading -- so far as we can tell
80+
// -- but it's not strictly speaking good behavior.
81+
let permit = match sem.try_acquire() {
82+
Ok(p) => p,
83+
Err(TryAcquireError::Closed) => {
84+
error!("Semaphore closed");
85+
return;
86+
}
87+
Err(TryAcquireError::NoPermits) => {
88+
warn!("httpd over connection capacity, load shedding");
89+
drop(stream);
90+
return;
91+
}
92+
};
93+
94+
let builder = auto::Builder::new(TokioExecutor::new());
95+
let serve_future = builder.serve_connection_with_upgrades(
96+
TokioIo::new(stream),
97+
service_factory(),
98+
);
99+
100+
if let Err(e) = serve_future.await {
101+
error!("Error serving {addr}: {e}");
102+
}
103+
drop(permit);
104+
});
105+
}
106+
}
107+
}
108+
109+
drop(listener);
110+
while join_set.join_next().await.is_some() {}
111+
Ok(())
112+
}

lading/src/blackhole/http.rs

Lines changed: 31 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,14 @@
66
//! `requests_received`: Total requests received
77
//!
88
9-
use std::{net::SocketAddr, sync::Arc, time::Duration};
10-
119
use bytes::Bytes;
1210
use http::{header::InvalidHeaderValue, status::InvalidStatusCode, HeaderMap};
1311
use http_body_util::{combinators::BoxBody, BodyExt};
14-
use hyper::{header, service::service_fn, Request, Response, StatusCode};
15-
use hyper_util::{
16-
rt::{TokioExecutor, TokioIo},
17-
server::conn::auto,
18-
};
12+
use hyper::{header, Request, Response, StatusCode};
1913
use metrics::counter;
2014
use serde::{Deserialize, Serialize};
21-
use tokio::{pin, sync::Semaphore, task::JoinSet};
22-
use tracing::{debug, error, info};
15+
use std::{net::SocketAddr, time::Duration};
16+
use tracing::{debug, error};
2317

2418
use super::General;
2519

@@ -42,9 +36,9 @@ pub enum Error {
4236
/// Failed to deserialize the configuration.
4337
#[error("Failed to deserialize the configuration: {0}")]
4438
Serde(#[from] serde_json::Error),
45-
/// Wrapper for [`std::io::Error`].
46-
#[error("IO error: {0}")]
47-
Io(#[from] std::io::Error),
39+
/// Wrapper for [`crate::blackhole::common::Error`].
40+
#[error(transparent)]
41+
Common(#[from] crate::blackhole::common::Error),
4842
}
4943

5044
/// Body variant supported by this blackhole.
@@ -240,72 +234,32 @@ impl Http {
240234
/// Function will return an error if the configuration is invalid or if
241235
/// receiving a packet fails.
242236
pub async fn run(self) -> Result<(), Error> {
243-
let listener = tokio::net::TcpListener::bind(self.httpd_addr).await?;
244-
let sem = Arc::new(Semaphore::new(self.concurrency_limit));
245-
let mut join_set = JoinSet::new();
246-
247-
let shutdown = self.shutdown.recv();
248-
pin!(shutdown);
249-
loop {
250-
tokio::select! {
251-
() = &mut shutdown => {
252-
info!("shutdown signal received");
253-
break;
254-
}
255-
256-
incoming = listener.accept() => {
257-
let (stream, addr) = match incoming {
258-
Ok((s,a)) => (s,a),
259-
Err(e) => {
260-
error!("accept error: {e}");
261-
continue;
262-
}
263-
};
264-
265-
let metric_labels = self.metric_labels.clone();
266-
let body_bytes = self.body_bytes.clone();
267-
let headers = self.headers.clone();
268-
let status = self.status;
269-
let response_delay = self.response_delay;
270-
let sem = Arc::clone(&sem);
271-
272-
join_set.spawn(async move {
273-
debug!("Accepted connection from {addr}");
274-
let permit = match sem.acquire_owned().await {
275-
Ok(p) => p,
276-
Err(e) => {
277-
error!("Semaphore closed: {e}");
278-
return;
279-
}
280-
};
281-
282-
let builder = auto::Builder::new(TokioExecutor::new());
283-
let serve_future = builder
284-
.serve_connection(
285-
TokioIo::new(stream),
286-
service_fn(move |req: Request<hyper::body::Incoming>| {
287-
debug!("REQUEST: {:?}", req);
288-
srv(
289-
status,
290-
metric_labels.clone(),
291-
body_bytes.clone(),
292-
req,
293-
headers.clone(),
294-
response_delay,
295-
)
296-
})
297-
);
237+
crate::blackhole::common::run_httpd(
238+
self.httpd_addr,
239+
self.concurrency_limit,
240+
self.shutdown,
241+
move || {
242+
let metric_labels = self.metric_labels.clone();
243+
let body_bytes = self.body_bytes.clone();
244+
let headers = self.headers.clone();
245+
let status = self.status;
246+
let response_delay = self.response_delay;
247+
248+
hyper::service::service_fn(move |req| {
249+
debug!("REQUEST: {:?}", req);
250+
srv(
251+
status,
252+
metric_labels.clone(),
253+
body_bytes.clone(),
254+
req,
255+
headers.clone(),
256+
response_delay,
257+
)
258+
})
259+
},
260+
)
261+
.await?;
298262

299-
if let Err(e) = serve_future.await {
300-
error!("Error serving {addr}: {e}");
301-
}
302-
drop(permit);
303-
});
304-
}
305-
}
306-
}
307-
drop(listener);
308-
while join_set.join_next().await.is_some() {}
309263
Ok(())
310264
}
311265
}

0 commit comments

Comments
 (0)