Skip to content

Commit

Permalink
Instrument connection limit and current count for http blackholes (#1190
Browse files Browse the repository at this point in the history
)

### What does this PR do?

This commit introduces telemetry keeping track of the connection
limit for http blackholes and the current connection count. This is
desirable information for targets that wish to trade total connections
for other resources.
  • Loading branch information
blt authored Dec 31, 2024
1 parent b2f3c8b commit 2d11692
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 20 deletions.
7 changes: 7 additions & 0 deletions lading/src/blackhole/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use hyper_util::{
server::conn::auto,
};
use lading_signal::Watcher;
use metrics::gauge;
use std::{net::SocketAddr, sync::Arc};
use tokio::{
net::TcpListener,
Expand All @@ -26,6 +27,7 @@ pub(crate) async fn run_httpd<SF, S>(
addr: SocketAddr,
concurrency_limit: usize,
shutdown: Watcher,
labels: Vec<(String, String)>,
make_service: SF,
) -> Result<(), Error>
where
Expand All @@ -48,9 +50,14 @@ where
let sem = Arc::new(Semaphore::new(concurrency_limit));
let mut join_set = JoinSet::new();

gauge!("connection.limit", &labels).set(concurrency_limit as f64);

let shutdown_fut = shutdown.recv();
pin!(shutdown_fut);
loop {
let claimed_permits = concurrency_limit - sem.available_permits();

gauge!("connection.current", &labels).set(claimed_permits as f64);
tokio::select! {
() = &mut shutdown_fut => {
info!("Shutdown signal received, stopping accept loop.");
Expand Down
4 changes: 2 additions & 2 deletions lading/src/blackhole/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use hyper::{header, Request, Response, StatusCode};
use metrics::counter;
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, time::Duration};
use tracing::{debug, error};
use tracing::error;

use super::General;

Expand Down Expand Up @@ -238,6 +238,7 @@ impl Http {
self.httpd_addr,
self.concurrency_limit,
self.shutdown,
self.metric_labels.clone(),
move || {
let metric_labels = self.metric_labels.clone();
let body_bytes = self.body_bytes.clone();
Expand All @@ -246,7 +247,6 @@ impl Http {
let response_delay = self.response_delay;

hyper::service::service_fn(move |req| {
debug!("REQUEST: {:?}", req);
srv(
status,
metric_labels.clone(),
Expand Down
18 changes: 5 additions & 13 deletions lading/src/blackhole/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::atomic::{AtomicU64, Ordering},
};

use bytes::Bytes;
Expand All @@ -20,7 +17,6 @@ use hyper::{header, Method, Request, Response, StatusCode};
use metrics::counter;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tracing::debug;

use super::General;

Expand Down Expand Up @@ -90,7 +86,7 @@ struct HecResponse {

async fn srv(
req: Request<hyper::body::Incoming>,
labels: Arc<Vec<(String, String)>>,
labels: Vec<(String, String)>,
) -> Result<hyper::Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
counter!("requests_received", &*labels).increment(1);

Expand Down Expand Up @@ -203,18 +199,14 @@ impl SplunkHec {
///
/// None known.
pub async fn run(self) -> Result<(), Error> {
let metric_labels = Arc::new(self.metric_labels.clone());

crate::blackhole::common::run_httpd(
self.httpd_addr,
self.concurrency_limit,
self.shutdown,
self.metric_labels.clone(),
move || {
let metric_labels = Arc::clone(&metric_labels);
hyper::service::service_fn(move |req| {
debug!("REQUEST: {:?}", req);
srv(req, Arc::clone(&metric_labels))
})
let metric_labels = self.metric_labels.clone();
hyper::service::service_fn(move |req| srv(req, metric_labels.clone()))
},
)
.await?;
Expand Down
7 changes: 2 additions & 5 deletions lading/src/blackhole/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,10 @@ impl Sqs {
self.httpd_addr,
self.concurrency_limit,
self.shutdown,
self.metric_labels.clone(),
move || {
let metric_labels = self.metric_labels.clone();

hyper::service::service_fn(move |req| {
debug!("REQUEST: {:?}", req);
srv(req, metric_labels.clone())
})
hyper::service::service_fn(move |req| srv(req, metric_labels.clone()))
},
)
.await?;
Expand Down

0 comments on commit 2d11692

Please sign in to comment.