diff --git a/Cargo.lock b/Cargo.lock index 2194e28..eda6b78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,12 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" +[[package]] +name = "anyhow" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" + [[package]] name = "autocfg" version = "1.4.0" @@ -1357,11 +1363,13 @@ name = "metrics_cloudwatch" version = "4.0.0" dependencies = [ "ahash", + "anyhow", "aws-config", "aws-sdk-cloudwatch", "aws-smithy-types-convert", "chrono", "criterion", + "flate2", "futures-util", "log", "metrics", diff --git a/Cargo.toml b/Cargo.toml index a9e055c..1a42233 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,13 +26,19 @@ metrics-util = { version = "0.19", features = ["registry"] } ordered-float = "4" rand = { version = "0.8", default-features = false, features = [ "small_rng", "std", "std_rng", ] } tokio = { version = "1", features = ["sync", "time"] } +flate2 = { version = "1", optional = true } + [dev-dependencies] +anyhow = "1" aws-config = "1" criterion = { version = "0.5", features = ["async_tokio"] } proptest = "1" tokio = { version = "1", features = ["test-util", "macros", "rt-multi-thread"] } +[features] +gzip = ["flate2"] +integration-test = ["gzip"] [[bench]] name = "bench" diff --git a/benches/bench.rs b/benches/bench.rs index 161a298..526debf 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -22,20 +22,16 @@ fn simple(c: &mut Criterion) { let cloudwatch_client = common::MockCloudWatchClient::default(); let (_shutdown_sender, receiver) = tokio::sync::oneshot::channel::<()>(); - let (recorder, task) = collector::new( - cloudwatch_client, - collector::Config { - cloudwatch_namespace: "".into(), - default_dimensions: Default::default(), - storage_resolution: collector::Resolution::Second, - send_interval_secs: 200, - send_timeout_secs: 10, - shutdown_signal: receiver.map(|_| ()).boxed().shared(), - metric_buffer_size: 1024, - force_flush_stream: Some(Box::pin(futures_util::stream::empty())), - }, - ); - metrics::set_global_recorder(recorder).unwrap(); + let task = metrics_cloudwatch::Builder::new() + .cloudwatch_namespace("") + .storage_resolution(collector::Resolution::Second) + .send_interval_secs(200) + .send_timeout_secs(10) + .shutdown_signal(receiver.map(|_| ()).boxed()) + .metric_buffer_size(1024) + .init_future_mock(cloudwatch_client, metrics::set_global_recorder) + .await + .unwrap(); tokio::spawn(task); }); @@ -71,8 +67,10 @@ fn simple(c: &mut Criterion) { send_timeout_secs: 10, shutdown_signal: receiver.map(|_| ()).boxed().shared(), metric_buffer_size: 1024, - force_flush_stream: Some(Box::pin(futures_util::stream::empty())), + #[cfg(feature = "gzip")] + gzip: false, }, + Some(Box::pin(futures_util::stream::empty())), ); let task = tokio::spawn(task); diff --git a/src/builder.rs b/src/builder.rs index 0103b46..1618d17 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -17,6 +17,9 @@ pub struct Builder { shutdown_signal: Option>, metric_buffer_size: usize, force_flush_stream: Option + Send>>>, + + #[cfg(feature = "gzip")] + gzip: bool, } fn extract_namespace(cloudwatch_namespace: Option) -> Result { @@ -39,6 +42,9 @@ impl Builder { shutdown_signal: Default::default(), metric_buffer_size: 2048, force_flush_stream: Default::default(), + + #[cfg(feature = "gzip")] + gzip: true, } } @@ -114,6 +120,14 @@ impl Builder { } } + /// Whether to gzip the payload before sending it to CloudWatch. + /// + /// Default: true + #[cfg(feature = "gzip")] + pub fn gzip(self, gzip: bool) -> Self { + Self { gzip, ..self } + } + /// Initializes the CloudWatch metrics backend and runs it in a new thread. /// /// Expects the [metrics::set_global_recorder] function as an argument as a safeguard against @@ -125,7 +139,8 @@ impl Builder { RecorderHandle, ) -> Result<(), metrics::SetRecorderError>, ) -> Result<(), Error> { - collector::init(set_global_recorder, client, self.build_config()?); + let (config, force_flush_stream) = self.build_config()?; + collector::init(set_global_recorder, client, config, force_flush_stream); Ok(()) } @@ -141,8 +156,9 @@ impl Builder { RecorderHandle, ) -> Result<(), metrics::SetRecorderError>, ) -> Result<(), Error> { + let (config, force_flush_stream) = self.build_config()?; let driver = - collector::init_future(set_global_recorder, client, self.build_config()?).await?; + collector::init_future(set_global_recorder, client, config, force_flush_stream).await?; driver.await; Ok(()) } @@ -159,7 +175,8 @@ impl Builder { RecorderHandle, ) -> Result<(), metrics::SetRecorderError>, ) -> Result, Error> { - collector::init_future(set_global_recorder, client, self.build_config()?).await + let (config, force_flush_stream) = self.build_config()?; + collector::init_future(set_global_recorder, client, config, force_flush_stream).await } #[doc(hidden)] @@ -170,23 +187,30 @@ impl Builder { RecorderHandle, ) -> Result<(), metrics::SetRecorderError>, ) -> Result, Error> { - collector::init_future(set_global_recorder, client, self.build_config()?).await - } - - fn build_config(self) -> Result { - Ok(Config { - cloudwatch_namespace: extract_namespace(self.cloudwatch_namespace)?, - default_dimensions: self.default_dimensions, - storage_resolution: self.storage_resolution.unwrap_or(Resolution::Minute), - send_interval_secs: self.send_interval_secs.unwrap_or(10), - send_timeout_secs: self.send_timeout_secs.unwrap_or(4), - shutdown_signal: self - .shutdown_signal - .unwrap_or_else(|| Box::pin(future::pending())) - .shared(), - metric_buffer_size: self.metric_buffer_size, - force_flush_stream: self.force_flush_stream, - }) + let (config, force_flush_stream) = self.build_config()?; + collector::init_future(set_global_recorder, client, config, force_flush_stream).await + } + + fn build_config( + self, + ) -> Result<(Config, Option + Send>>>), Error> { + Ok(( + Config { + cloudwatch_namespace: extract_namespace(self.cloudwatch_namespace)?, + default_dimensions: self.default_dimensions, + storage_resolution: self.storage_resolution.unwrap_or(Resolution::Minute), + send_interval_secs: self.send_interval_secs.unwrap_or(10), + send_timeout_secs: self.send_timeout_secs.unwrap_or(4), + shutdown_signal: self + .shutdown_signal + .unwrap_or_else(|| Box::pin(future::pending())) + .shared(), + metric_buffer_size: self.metric_buffer_size, + #[cfg(feature = "gzip")] + gzip: self.gzip, + }, + self.force_flush_stream, + )) } } @@ -201,16 +225,22 @@ impl fmt::Debug for Builder { shutdown_signal: _, metric_buffer_size, force_flush_stream: _, + #[cfg(feature = "gzip")] + gzip, } = self; - f.debug_struct("Builder") - .field("cloudwatch_namespace", cloudwatch_namespace) + let mut f = f.debug_struct("Builder"); + f.field("cloudwatch_namespace", cloudwatch_namespace) .field("default_dimensions", default_dimensions) .field("storage_resolution", storage_resolution) .field("send_interval_secs", send_interval_secs) .field("send_timeout_secs", send_timeout_secs) .field("shutdown_signal", &"BoxFuture") .field("metric_buffer_size", metric_buffer_size) - .field("force_flush_stream", &"dyn Stream") - .finish() + .field("force_flush_stream", &"dyn Stream"); + + #[cfg(feature = "gzip")] + f.field("gzip", gzip); + + f.finish() } } diff --git a/src/collector.rs b/src/collector.rs index adba283..6191a5c 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -34,7 +34,7 @@ use crate::{error::Error, BoxFuture}; pub trait CloudWatch { fn put_metric_data( &self, - namespace: String, + config: &Config, data: Vec, ) -> BoxFuture<'_, Result<(), SdkError>>; } @@ -42,13 +42,54 @@ pub trait CloudWatch { impl CloudWatch for Client { fn put_metric_data( &self, - namespace: String, + config: &Config, data: Vec, ) -> BoxFuture<'_, Result<(), SdkError>> { let put = self.put_metric_data(); + let namespace = config.cloudwatch_namespace.clone(); + #[cfg(feature = "gzip")] + let gzip = config.gzip; + #[allow(unused_mut)] async move { put.namespace(namespace) .set_metric_data(Some(data)) + .customize() + .map_request(move |request| { + #[cfg(feature = "gzip")] + if gzip { + use std::io::Write; + + let mut request = request; + + // If the request is already encoded upstream has most likely implemented gzipping, so don't gzip it again + if let Some(content_encoding) = request.headers().get("content-encoding") { + log::trace!( + "PutMetricData request is already encoded: {:?}", + content_encoding + ); + } else { + let body = request.body_mut(); + + if let Some(bytes) = body.bytes() { + let mut encoder = flate2::write::GzEncoder::new( + Vec::new(), + flate2::Compression::default(), + ); + encoder.write_all(bytes)?; + + let r = encoder.finish()?; + let r_len = r.len() as u64; + *body = r.into(); + request.headers_mut().insert("content-encoding", "gzip"); + request + .headers_mut() + .insert("content-length", r_len.to_string()); + } + } + return Ok::<_, std::io::Error>(request); + } + Ok::<_, std::io::Error>(request) + }) .send() .await .map(|_| ()) @@ -68,6 +109,7 @@ const MAX_HISTOGRAM_VALUES: usize = 150; const MAX_CW_METRICS_PUT_SIZE: usize = 800_000; // Docs say 1Mb but we set our max lower to be safe since we only have a heuristic const RETRY_INTERVAL: Duration = Duration::from_secs(4); +// Only public for tests/common/mock.rs pub struct Config { pub cloudwatch_namespace: String, pub default_dimensions: BTreeMap, @@ -76,7 +118,8 @@ pub struct Config { pub send_timeout_secs: u64, pub shutdown_signal: future::Shared>, pub metric_buffer_size: usize, - pub force_flush_stream: Option + Send>>>, + #[cfg(feature = "gzip")] + pub gzip: bool, } struct CollectorConfig { @@ -170,6 +213,7 @@ pub(crate) fn init( ) -> Result<(), metrics::SetRecorderError>, client: impl CloudWatch + Send + 'static, config: Config, + force_flush_stream: Option + Send>>>, ) { let _ = thread::spawn(move || { // single threaded @@ -178,7 +222,7 @@ pub(crate) fn init( .build() .unwrap(); runtime.block_on(async move { - match init_future(set_global_recorder, client, config).await { + match init_future(set_global_recorder, client, config, force_flush_stream).await { Err(e) => { log::warn!("{}", e); } @@ -194,22 +238,22 @@ pub(crate) async fn init_future( ) -> Result<(), metrics::SetRecorderError>, client: impl CloudWatch, config: Config, + force_flush_stream: Option + Send>>>, ) -> Result, Error> { - let (recorder, task) = new(client, config); + let (recorder, task) = new(client, config, force_flush_stream); set_global_recorder(recorder).map_err(Error::SetRecorder)?; Ok(task) } pub fn new( client: impl CloudWatch, - mut config: Config, + config: Config, + force_flush_stream: Option + Send>>>, ) -> (RecorderHandle, impl Future) { let (collect_sender, mut collect_receiver) = mpsc::channel(1024); let (emit_sender, emit_receiver) = mpsc::channel(config.metric_buffer_size); - let force_flush_stream = config - .force_flush_stream - .take() + let force_flush_stream = force_flush_stream .unwrap_or_else(|| { Box::pin(futures_util::stream::empty::<()>()) as Pin + Send>> }) @@ -232,18 +276,13 @@ pub fn new( .take_until(config.shutdown_signal.clone().map(|_| true)), ); - let emitter = mk_emitter( - emit_receiver, - client, - config.cloudwatch_namespace, - config.send_timeout_secs, - ); - let internal_config = CollectorConfig { - default_dimensions: config.default_dimensions, - storage_resolution: config.storage_resolution, + default_dimensions: config.default_dimensions.clone(), + storage_resolution: config.storage_resolution.clone(), }; + let emitter = mk_emitter(emit_receiver, client, config); + let mut collector = Collector::new(internal_config); let collection_fut = async move { collector.accept_messages(message_stream).await; @@ -291,41 +330,36 @@ where async fn mk_emitter( mut emit_receiver: mpsc::Receiver>, cloudwatch_client: impl CloudWatch, - cloudwatch_namespace: String, - send_timeout_secs: u64, + config: Config, ) { let cloudwatch_client = &cloudwatch_client; - let cloudwatch_namespace = &cloudwatch_namespace; while let Some(metrics) = emit_receiver.recv().await { - let chunks: Vec<_> = metrics_chunks(&metrics).collect(); - stream::iter(chunks) - .for_each(|metric_data| async move { - let send_fut = retry_on_throttled( - || async { - cloudwatch_client - .put_metric_data(cloudwatch_namespace.clone(), metric_data.to_owned()) - .await - }, - send_timeout_secs, - ); - match send_fut.await { - Ok(Ok(_output)) => { - log::debug!("Successfully sent a metrics batch to CloudWatch.") - } - Ok(Err(e)) => log::warn!( - "Failed to send metrics: {:?}: {}", - metric_data - .iter() - .map(|m| &m.metric_name) - .collect::>(), - e, - ), - Err(tokio::time::error::Elapsed { .. }) => { - log::warn!("Failed to send metrics: send timeout") - } + for metric_data in metrics_chunks(&metrics) { + let send_fut = retry_on_throttled( + || async { + cloudwatch_client + .put_metric_data(&config, metric_data.to_owned()) + .await + }, + config.send_timeout_secs, + ); + match send_fut.await { + Ok(Ok(_output)) => { + log::debug!("Successfully sent a metrics batch to CloudWatch.") } - }) - .await; + Ok(Err(e)) => log::warn!( + "Failed to send metrics: {:?}: {}", + metric_data + .iter() + .map(|m| &m.metric_name) + .collect::>(), + e, + ), + Err(tokio::time::error::Elapsed { .. }) => { + log::warn!("Failed to send metrics: send timeout") + } + } + } } } diff --git a/tests/common/mock.rs b/tests/common/mock.rs index ead3380..97461d6 100644 --- a/tests/common/mock.rs +++ b/tests/common/mock.rs @@ -7,7 +7,7 @@ use aws_sdk_cloudwatch::{ types::MetricDatum, }; use futures_util::FutureExt; -use metrics_cloudwatch::collector::CloudWatch; +use metrics_cloudwatch::collector::{CloudWatch, Config}; #[derive(Clone, Default)] pub struct MockCloudWatchClient { @@ -17,11 +17,11 @@ pub struct MockCloudWatchClient { impl CloudWatch for MockCloudWatchClient { fn put_metric_data( &self, - namespace: String, + config: &Config, data: Vec, ) -> metrics_cloudwatch::BoxFuture<'_, Result<(), SdkError>> { let data = PutMetricDataInput::builder() - .namespace(namespace) + .namespace(config.cloudwatch_namespace.clone()) .set_metric_data(Some(data)) .build() .unwrap(); diff --git a/tests/gzip.rs b/tests/gzip.rs new file mode 100644 index 0000000..8260362 --- /dev/null +++ b/tests/gzip.rs @@ -0,0 +1,35 @@ +#![cfg(feature = "integration-test")] + +use anyhow::Result; +use futures_util::FutureExt; + +#[tokio::test] +async fn test_gzip() -> Result<()> { + let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .load() + .await; + + let client = aws_sdk_cloudwatch::Client::new(&sdk_config); + + let (shutdown_sender, receiver) = tokio::sync::oneshot::channel::<()>(); + + let driver = metrics_cloudwatch::Builder::new() + .shutdown_signal(receiver.map(|_| ()).boxed()) + .cloudwatch_namespace("metrics-cloudwatch-test") + .init_async(client, metrics::set_global_recorder) + .await?; + // Initialize the backend + let metrics_task = tokio::spawn(driver); + + for _ in 0..1000 { + metrics::counter!("requests").increment(1); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + drop(shutdown_sender); + + metrics_task.await?; + + Ok(()) +} diff --git a/tests/test.rs b/tests/test.rs index 71f4fa9..d8d00d4 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,4 +1,4 @@ -use std::{error::Error, time::Duration}; +use std::time::Duration; use aws_sdk_cloudwatch::types::{Dimension, StandardUnit, StatisticSet}; use futures_util::FutureExt; @@ -7,12 +7,14 @@ use common::MockCloudWatchClient; mod common; +use anyhow::Result; + fn dim(name: &str, value: &str) -> Dimension { Dimension::builder().name(name).value(value).build() } #[tokio::test] -async fn test_flush_on_shutdown() -> Result<(), Box> { +async fn test_flush_on_shutdown() -> Result<()> { let client = MockCloudWatchClient::default(); tokio::time::pause();