From 2d11692fa4dbc88fe2a0719d4bbb510191bf5898 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Tue, 31 Dec 2024 09:35:32 -0800 Subject: [PATCH] Instrument connection limit and current count for http blackholes (#1190) ### What does this PR do? This commit introduces telemetry keeping track of the connection limit for http blackholes and the current connection count. This is desirable information for targets that wish to trade total connections for other resources. --- lading/src/blackhole/common.rs | 7 +++++++ lading/src/blackhole/http.rs | 4 ++-- lading/src/blackhole/splunk_hec.rs | 18 +++++------------- lading/src/blackhole/sqs.rs | 7 ++----- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/lading/src/blackhole/common.rs b/lading/src/blackhole/common.rs index 1dc1bacb7..e1b85d6ef 100644 --- a/lading/src/blackhole/common.rs +++ b/lading/src/blackhole/common.rs @@ -6,6 +6,7 @@ use hyper_util::{ server::conn::auto, }; use lading_signal::Watcher; +use metrics::gauge; use std::{net::SocketAddr, sync::Arc}; use tokio::{ net::TcpListener, @@ -26,6 +27,7 @@ pub(crate) async fn run_httpd( addr: SocketAddr, concurrency_limit: usize, shutdown: Watcher, + labels: Vec<(String, String)>, make_service: SF, ) -> Result<(), Error> where @@ -48,9 +50,14 @@ where let sem = Arc::new(Semaphore::new(concurrency_limit)); let mut join_set = JoinSet::new(); + gauge!("connection.limit", &labels).set(concurrency_limit as f64); + let shutdown_fut = shutdown.recv(); pin!(shutdown_fut); loop { + let claimed_permits = concurrency_limit - sem.available_permits(); + + gauge!("connection.current", &labels).set(claimed_permits as f64); tokio::select! { () = &mut shutdown_fut => { info!("Shutdown signal received, stopping accept loop."); diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index e4998a387..53b6bf0ea 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -13,7 +13,7 @@ use hyper::{header, Request, Response, StatusCode}; use metrics::counter; use serde::{Deserialize, Serialize}; use std::{net::SocketAddr, time::Duration}; -use tracing::{debug, error}; +use tracing::error; use super::General; @@ -238,6 +238,7 @@ impl Http { self.httpd_addr, self.concurrency_limit, self.shutdown, + self.metric_labels.clone(), move || { let metric_labels = self.metric_labels.clone(); let body_bytes = self.body_bytes.clone(); @@ -246,7 +247,6 @@ impl Http { let response_delay = self.response_delay; hyper::service::service_fn(move |req| { - debug!("REQUEST: {:?}", req); srv( status, metric_labels.clone(), diff --git a/lading/src/blackhole/splunk_hec.rs b/lading/src/blackhole/splunk_hec.rs index 182319d43..0de7d0284 100644 --- a/lading/src/blackhole/splunk_hec.rs +++ b/lading/src/blackhole/splunk_hec.rs @@ -8,10 +8,7 @@ use std::{ net::SocketAddr, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, }; use bytes::Bytes; @@ -20,7 +17,6 @@ use hyper::{header, Method, Request, Response, StatusCode}; use metrics::counter; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; -use tracing::debug; use super::General; @@ -90,7 +86,7 @@ struct HecResponse { async fn srv( req: Request, - labels: Arc>, + labels: Vec<(String, String)>, ) -> Result>, hyper::Error> { counter!("requests_received", &*labels).increment(1); @@ -203,18 +199,14 @@ impl SplunkHec { /// /// None known. pub async fn run(self) -> Result<(), Error> { - let metric_labels = Arc::new(self.metric_labels.clone()); - crate::blackhole::common::run_httpd( self.httpd_addr, self.concurrency_limit, self.shutdown, + self.metric_labels.clone(), move || { - let metric_labels = Arc::clone(&metric_labels); - hyper::service::service_fn(move |req| { - debug!("REQUEST: {:?}", req); - srv(req, Arc::clone(&metric_labels)) - }) + let metric_labels = self.metric_labels.clone(); + hyper::service::service_fn(move |req| srv(req, metric_labels.clone())) }, ) .await?; diff --git a/lading/src/blackhole/sqs.rs b/lading/src/blackhole/sqs.rs index b94512bfc..9ea3e91e3 100644 --- a/lading/src/blackhole/sqs.rs +++ b/lading/src/blackhole/sqs.rs @@ -101,13 +101,10 @@ impl Sqs { self.httpd_addr, self.concurrency_limit, self.shutdown, + self.metric_labels.clone(), move || { let metric_labels = self.metric_labels.clone(); - - hyper::service::service_fn(move |req| { - debug!("REQUEST: {:?}", req); - srv(req, metric_labels.clone()) - }) + hyper::service::service_fn(move |req| srv(req, metric_labels.clone())) }, ) .await?;