Skip to content

Commit

Permalink
feat: Allow put_metric_data to be sent gzipped
Browse files Browse the repository at this point in the history
No movement on awslabs/aws-sdk-rust#1068 for almost a year now so I think we need to just hack it in our selves
  • Loading branch information
Marwes committed Jan 22, 2025
1 parent 224c6e9 commit 7ffd898
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 93 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ metrics-util = { version = "0.19", features = ["registry"] }
ordered-float = "4"
rand = { version = "0.8", default-features = false, features = [ "small_rng", "std", "std_rng", ] }
tokio = { version = "1", features = ["sync", "time"] }
flate2 = { version = "1", optional = true }


[dev-dependencies]
anyhow = "1"
aws-config = "1"
criterion = { version = "0.5", features = ["async_tokio"] }
proptest = "1"
tokio = { version = "1", features = ["test-util", "macros", "rt-multi-thread"] }

[features]
gzip = ["flate2"]
integration-test = ["gzip"]

[[bench]]
name = "bench"
Expand Down
28 changes: 13 additions & 15 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,16 @@ fn simple(c: &mut Criterion) {
let cloudwatch_client = common::MockCloudWatchClient::default();

let (_shutdown_sender, receiver) = tokio::sync::oneshot::channel::<()>();
let (recorder, task) = collector::new(
cloudwatch_client,
collector::Config {
cloudwatch_namespace: "".into(),
default_dimensions: Default::default(),
storage_resolution: collector::Resolution::Second,
send_interval_secs: 200,
send_timeout_secs: 10,
shutdown_signal: receiver.map(|_| ()).boxed().shared(),
metric_buffer_size: 1024,
force_flush_stream: Some(Box::pin(futures_util::stream::empty())),
},
);
metrics::set_global_recorder(recorder).unwrap();
let task = metrics_cloudwatch::Builder::new()
.cloudwatch_namespace("")
.storage_resolution(collector::Resolution::Second)
.send_interval_secs(200)
.send_timeout_secs(10)
.shutdown_signal(receiver.map(|_| ()).boxed())
.metric_buffer_size(1024)
.init_future_mock(cloudwatch_client, metrics::set_global_recorder)
.await
.unwrap();
tokio::spawn(task);
});

Expand Down Expand Up @@ -71,8 +67,10 @@ fn simple(c: &mut Criterion) {
send_timeout_secs: 10,
shutdown_signal: receiver.map(|_| ()).boxed().shared(),
metric_buffer_size: 1024,
force_flush_stream: Some(Box::pin(futures_util::stream::empty())),
#[cfg(feature = "gzip")]
gzip: false,
},
Some(Box::pin(futures_util::stream::empty())),
);

let task = tokio::spawn(task);
Expand Down
78 changes: 54 additions & 24 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct Builder {
shutdown_signal: Option<BoxFuture<'static, ()>>,
metric_buffer_size: usize,
force_flush_stream: Option<Pin<Box<dyn Stream<Item = ()> + Send>>>,

#[cfg(feature = "gzip")]
gzip: bool,
}

fn extract_namespace(cloudwatch_namespace: Option<String>) -> Result<String, Error> {
Expand All @@ -39,6 +42,9 @@ impl Builder {
shutdown_signal: Default::default(),
metric_buffer_size: 2048,
force_flush_stream: Default::default(),

#[cfg(feature = "gzip")]
gzip: true,
}
}

Expand Down Expand Up @@ -114,6 +120,14 @@ impl Builder {
}
}

/// Whether to gzip the payload before sending it to CloudWatch.
///
/// Default: true
#[cfg(feature = "gzip")]
pub fn gzip(self, gzip: bool) -> Self {
Self { gzip, ..self }
}

/// Initializes the CloudWatch metrics backend and runs it in a new thread.
///
/// Expects the [metrics::set_global_recorder] function as an argument as a safeguard against
Expand All @@ -125,7 +139,8 @@ impl Builder {
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<(), Error> {
collector::init(set_global_recorder, client, self.build_config()?);
let (config, force_flush_stream) = self.build_config()?;
collector::init(set_global_recorder, client, config, force_flush_stream);
Ok(())
}

Expand All @@ -141,8 +156,9 @@ impl Builder {
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<(), Error> {
let (config, force_flush_stream) = self.build_config()?;
let driver =
collector::init_future(set_global_recorder, client, self.build_config()?).await?;
collector::init_future(set_global_recorder, client, config, force_flush_stream).await?;
driver.await;
Ok(())
}
Expand All @@ -159,7 +175,8 @@ impl Builder {
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<impl Future<Output = ()>, Error> {
collector::init_future(set_global_recorder, client, self.build_config()?).await
let (config, force_flush_stream) = self.build_config()?;
collector::init_future(set_global_recorder, client, config, force_flush_stream).await
}

#[doc(hidden)]
Expand All @@ -170,23 +187,30 @@ impl Builder {
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<impl Future<Output = ()>, Error> {
collector::init_future(set_global_recorder, client, self.build_config()?).await
}

fn build_config(self) -> Result<Config, Error> {
Ok(Config {
cloudwatch_namespace: extract_namespace(self.cloudwatch_namespace)?,
default_dimensions: self.default_dimensions,
storage_resolution: self.storage_resolution.unwrap_or(Resolution::Minute),
send_interval_secs: self.send_interval_secs.unwrap_or(10),
send_timeout_secs: self.send_timeout_secs.unwrap_or(4),
shutdown_signal: self
.shutdown_signal
.unwrap_or_else(|| Box::pin(future::pending()))
.shared(),
metric_buffer_size: self.metric_buffer_size,
force_flush_stream: self.force_flush_stream,
})
let (config, force_flush_stream) = self.build_config()?;
collector::init_future(set_global_recorder, client, config, force_flush_stream).await
}

fn build_config(
self,
) -> Result<(Config, Option<Pin<Box<dyn Stream<Item = ()> + Send>>>), Error> {
Ok((
Config {
cloudwatch_namespace: extract_namespace(self.cloudwatch_namespace)?,
default_dimensions: self.default_dimensions,
storage_resolution: self.storage_resolution.unwrap_or(Resolution::Minute),
send_interval_secs: self.send_interval_secs.unwrap_or(10),
send_timeout_secs: self.send_timeout_secs.unwrap_or(4),
shutdown_signal: self
.shutdown_signal
.unwrap_or_else(|| Box::pin(future::pending()))
.shared(),
metric_buffer_size: self.metric_buffer_size,
#[cfg(feature = "gzip")]
gzip: self.gzip,
},
self.force_flush_stream,
))
}
}

Expand All @@ -201,16 +225,22 @@ impl fmt::Debug for Builder {
shutdown_signal: _,
metric_buffer_size,
force_flush_stream: _,
#[cfg(feature = "gzip")]
gzip,
} = self;
f.debug_struct("Builder")
.field("cloudwatch_namespace", cloudwatch_namespace)
let mut f = f.debug_struct("Builder");
f.field("cloudwatch_namespace", cloudwatch_namespace)
.field("default_dimensions", default_dimensions)
.field("storage_resolution", storage_resolution)
.field("send_interval_secs", send_interval_secs)
.field("send_timeout_secs", send_timeout_secs)
.field("shutdown_signal", &"BoxFuture")
.field("metric_buffer_size", metric_buffer_size)
.field("force_flush_stream", &"dyn Stream")
.finish()
.field("force_flush_stream", &"dyn Stream");

#[cfg(feature = "gzip")]
f.field("gzip", gzip);

f.finish()
}
}
Loading

0 comments on commit 7ffd898

Please sign in to comment.