From 9cf38ef3fcc785aa5e845545a4fac3b5396184bb Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Sun, 15 Mar 2026 18:11:24 -0700 Subject: [PATCH 1/8] chore: emit SQS metrics Signed-off-by: Abdullah Yildirim --- rust/extns/numaflow-sqs/src/sink.rs | 2 + rust/numaflow-core/src/metrics.rs | 60 +++++++++++++++++++++++ rust/numaflow-core/src/sinker/sink/sqs.rs | 36 +++++++++++++- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/rust/extns/numaflow-sqs/src/sink.rs b/rust/extns/numaflow-sqs/src/sink.rs index 04fdb1114d..5a7d5375a0 100644 --- a/rust/extns/numaflow-sqs/src/sink.rs +++ b/rust/extns/numaflow-sqs/src/sink.rs @@ -98,6 +98,7 @@ impl TryFrom for SendMessageBatchRequestEntry { pub struct SqsSink { client: Client, queue_url: &'static str, + pub queue_name: &'static str, } /// Builder for creating and configuring an SQS sink. @@ -195,6 +196,7 @@ impl SqsSinkBuilder { Ok(SqsSink { client: sqs_client.clone(), queue_url: Box::leak(queue_url.clone().to_string().into_boxed_str()), + queue_name: self.config.queue_name, }) } } diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index f46574ddaf..8d14d6b23b 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -151,6 +151,13 @@ const JETSTREAM_ISB_READ_TIME_TOTAL: &str = "read_time_total"; const JETSTREAM_ISB_WRITE_TIME_TOTAL: &str = "write_time_total"; const JETSTREAM_ISB_ACK_TIME_TOTAL: &str = "ack_time_total"; +// SQS related metric configs +const SQS_REGISTRY_PREFIX: &str = "sqs"; +const SQS_PRODUCER_REGISTRY_PREFIX: &str = "producer"; +const SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL: &str = "publish_success"; +const SQS_PRODUCER_PUBLISH_FAILURE_TOTAL: &str = "publish_failure"; +const SQS_PRODUCER_PUBLISH_LATENCY: &str = "publish_latency"; + /// A deep healthcheck for components. Each component should implement IsReady for both builtins and /// user-defined containers. #[derive(Clone)] @@ -533,6 +540,54 @@ impl JetStreamISBMetrics { } } +pub(crate) struct SQSProducerMetrics { + pub(crate) publish_latency: Family, Histogram>, + pub(crate) publish_success: Family, Counter>, + pub(crate) publish_failure: Family, Counter>, +} + +pub(crate) struct SQSMetrics { + pub(crate) producer: SQSProducerMetrics, +} + +impl SQSMetrics { + pub(crate) fn new() -> Self { + let metrics = Self { + producer: SQSProducerMetrics { + publish_latency: Family::, Histogram>::new_with_constructor( + || Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)), + ), + publish_success: Family::, Counter>::default(), + publish_failure: Family::, Counter>::default(), + } + }; + + let mut registry = global_registry().registry.lock(); + let sqs_registry = registry.sub_registry_with_prefix(SQS_REGISTRY_PREFIX); + let producer_registery = sqs_registry.sub_registry_with_prefix(SQS_PRODUCER_REGISTRY_PREFIX); + + producer_registery.register( + SQS_PRODUCER_PUBLISH_LATENCY, + "Latency of SQS publish operations in microseconds", + metrics.producer.publish_latency.clone(), + ); + + producer_registery.register( + SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL, + "Number of messages successfully published to SQS", + metrics.producer.publish_success.clone(), + ); + + producer_registery.register( + SQS_PRODUCER_PUBLISH_FAILURE_TOTAL, + "Number of messages that failed to publish to SQS", + metrics.producer.publish_failure.clone() + ); + + metrics + } +} + /// Exponential bucket distribution with range. /// Creates `length` buckets, where the lowest bucket is `min` and the highest bucket is `max`. /// The final +Inf bucket is not counted and not included in the returned iterator. @@ -1063,6 +1118,11 @@ pub(crate) fn pipeline_metrics() -> &'static PipelineMetrics { PIPELINE_METRICS.get_or_init(PipelineMetrics::new) } +static SQS_METRICS: OnceLock = OnceLock::new(); +pub(crate) fn sqs_metrics() -> &'static SQSMetrics { + SQS_METRICS.get_or_init(SQSMetrics::new) +} + // sdk_info_labels is a helper function used to build the labels used in sdk_info pub(crate) fn sdk_info_labels( component: String, diff --git a/rust/numaflow-core/src/sinker/sink/sqs.rs b/rust/numaflow-core/src/sinker/sink/sqs.rs index c81ed3c4a3..ffe2c7acb5 100644 --- a/rust/numaflow-core/src/sinker/sink/sqs.rs +++ b/rust/numaflow-core/src/sinker/sink/sqs.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use numaflow_sqs::{ SQS_METADATA_KEY, sink::{SqsSink, SqsSinkMessage}, @@ -7,6 +8,7 @@ use crate::error; use crate::error::Error; use crate::message::Message; use crate::sinker::sink::{ResponseFromSink, ResponseStatusFromSink, Sink}; +use crate::metrics::sqs_metrics; impl TryFrom for SqsSinkMessage { type Error = error::Error; @@ -61,17 +63,33 @@ impl From for Error { impl Sink for SqsSink { async fn sink(&mut self, messages: Vec) -> error::Result> { let mut result = Vec::with_capacity(messages.len()); + let labels = vec![("queue_name".to_string(), self.queue_name.to_string())]; let sqs_messages: Vec = messages .iter() .map(|msg| SqsSinkMessage::try_from(msg.clone())) .collect::, _>>()?; + let message_count = sqs_messages.len(); + + let start = Instant::now(); let sqs_sink_result = self.sink_messages(sqs_messages).await; + let elapsed = start.elapsed().as_micros() as f64; + + sqs_metrics().producer.publish_latency.get_or_create(&labels).observe(elapsed); - if sqs_sink_result.is_err() { - return Err(Error::from(sqs_sink_result.err().unwrap())); + if let Err(e) = sqs_sink_result { + sqs_metrics() + .producer + .publish_failure + .get_or_create(&labels) + .inc_by(message_count as u64); + return Err(Error::from(e)); } + + let mut success_count = 0u64; + let mut failure_count = 0u64; + for sqs_response in sqs_sink_result?.iter() { match &sqs_response.status { Ok(_) => { @@ -79,15 +97,29 @@ impl Sink for SqsSink { id: sqs_response.id.clone(), status: ResponseStatusFromSink::Success, }); + success_count += 1; } Err(err) => { result.push(ResponseFromSink { id: sqs_response.id.clone(), status: ResponseStatusFromSink::Failed(err.to_string()), }); + failure_count += 1; } } } + + sqs_metrics() + .producer + .publish_success + .get_or_create(&labels) + .inc_by(success_count); + sqs_metrics() + .producer + .publish_failure + .get_or_create(&labels) + .inc_by(failure_count); + Ok(result) } } From 4869938fac650bbae70f49f0950a8244bb377c53 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Sun, 15 Mar 2026 18:26:11 -0700 Subject: [PATCH 2/8] cargo fmt Signed-off-by: Abdullah Yildirim --- rust/numaflow-core/src/metrics.rs | 7 ++++--- rust/numaflow-core/src/sinker/sink/sqs.rs | 10 +++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 8d14d6b23b..04e4ef19ac 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -559,12 +559,13 @@ impl SQSMetrics { ), publish_success: Family::, Counter>::default(), publish_failure: Family::, Counter>::default(), - } + }, }; let mut registry = global_registry().registry.lock(); let sqs_registry = registry.sub_registry_with_prefix(SQS_REGISTRY_PREFIX); - let producer_registery = sqs_registry.sub_registry_with_prefix(SQS_PRODUCER_REGISTRY_PREFIX); + let producer_registery = + sqs_registry.sub_registry_with_prefix(SQS_PRODUCER_REGISTRY_PREFIX); producer_registery.register( SQS_PRODUCER_PUBLISH_LATENCY, @@ -581,7 +582,7 @@ impl SQSMetrics { producer_registery.register( SQS_PRODUCER_PUBLISH_FAILURE_TOTAL, "Number of messages that failed to publish to SQS", - metrics.producer.publish_failure.clone() + metrics.producer.publish_failure.clone(), ); metrics diff --git a/rust/numaflow-core/src/sinker/sink/sqs.rs b/rust/numaflow-core/src/sinker/sink/sqs.rs index ffe2c7acb5..36b58dd227 100644 --- a/rust/numaflow-core/src/sinker/sink/sqs.rs +++ b/rust/numaflow-core/src/sinker/sink/sqs.rs @@ -1,14 +1,14 @@ -use std::time::Instant; use numaflow_sqs::{ SQS_METADATA_KEY, sink::{SqsSink, SqsSinkMessage}, }; +use std::time::Instant; use crate::error; use crate::error::Error; use crate::message::Message; -use crate::sinker::sink::{ResponseFromSink, ResponseStatusFromSink, Sink}; use crate::metrics::sqs_metrics; +use crate::sinker::sink::{ResponseFromSink, ResponseStatusFromSink, Sink}; impl TryFrom for SqsSinkMessage { type Error = error::Error; @@ -76,7 +76,11 @@ impl Sink for SqsSink { let sqs_sink_result = self.sink_messages(sqs_messages).await; let elapsed = start.elapsed().as_micros() as f64; - sqs_metrics().producer.publish_latency.get_or_create(&labels).observe(elapsed); + sqs_metrics() + .producer + .publish_latency + .get_or_create(&labels) + .observe(elapsed); if let Err(e) = sqs_sink_result { sqs_metrics() From 7aacf23e7d2c3a94b6bbfc80429c35a3bed695ea Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Sun, 15 Mar 2026 18:56:55 -0700 Subject: [PATCH 3/8] update unit tests Signed-off-by: Abdullah Yildirim --- rust/extns/numaflow-sqs/src/sink.rs | 2 ++ rust/numaflow-core/src/metrics.rs | 25 ++++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/rust/extns/numaflow-sqs/src/sink.rs b/rust/extns/numaflow-sqs/src/sink.rs index 5a7d5375a0..74050adecf 100644 --- a/rust/extns/numaflow-sqs/src/sink.rs +++ b/rust/extns/numaflow-sqs/src/sink.rs @@ -342,6 +342,8 @@ mod tests { sink.queue_url, "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" ); + + assert_eq!(sink.queue_name, "test-q"); } #[test(tokio::test)] diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 04e4ef19ac..5bd2e1e6e8 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -1973,6 +1973,24 @@ mod tests { .write_error_total .get_or_create(&jetstream_isb_error_labels) .inc(); + // populate sqs metrics + let sqs_metrics = sqs_metrics(); + let sqs_labels = vec![("queue_name".to_string(), "test-queue".to_string())]; + sqs_metrics + .producer + .publish_success + .get_or_create(&sqs_labels) + .inc_by(5); + sqs_metrics + .producer + .publish_failure + .get_or_create(&sqs_labels) + .inc_by(2); + sqs_metrics + .producer + .publish_latency + .get_or_create(&sqs_labels) + .observe(1500.0); // Validate the metric names let state = global_registry().registry.lock(); @@ -2017,6 +2035,11 @@ mod tests { r#"isb_jetstream_write_error_total{buffer="test_jetstream_isb",reason="test_error"} 1"#, r#"isb_jetstream_buffer_soft_usage{buffer="test_jetstream_isb"} 0.22"#, r#"isb_jetstream_buffer_pending{buffer="test_jetstream_isb"} 5"#, + r#"sqs_producer_publish_success_total{queue_name="test-queue"} 5"#, + r#"sqs_producer_publish_failure_total{queue_name="test-queue"} 2"#, + r#"sqs_producer_publish_latency_sum{queue_name="test-queue"} 1500.0"#, + r#"sqs_producer_publish_latency_count{queue_name="test-queue"} 1"#, + r#"sqs_producer_publish_latency_bucket{le="100.0",queue_name="test-queue"} 0"#, ]; let got = buffer @@ -2027,7 +2050,7 @@ mod tests { .join("\n"); for t in expected { - assert!(got.contains(t)); + assert!(got.contains(t), "expected metric not found: {}", t); } } } From 557d6b3d3d4ef4344e0090c59bc466dbff438479 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Sun, 15 Mar 2026 19:05:35 -0700 Subject: [PATCH 4/8] cargo fmt Signed-off-by: Abdullah Yildirim --- rust/extns/numaflow-sqs/src/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/extns/numaflow-sqs/src/sink.rs b/rust/extns/numaflow-sqs/src/sink.rs index 74050adecf..70c1dc5506 100644 --- a/rust/extns/numaflow-sqs/src/sink.rs +++ b/rust/extns/numaflow-sqs/src/sink.rs @@ -342,7 +342,7 @@ mod tests { sink.queue_url, "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" ); - + assert_eq!(sink.queue_name, "test-q"); } From c65227e1120592b2aed896ceb57231c99bc99380 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Sun, 15 Mar 2026 21:13:23 -0700 Subject: [PATCH 5/8] update metrics docs Signed-off-by: Abdullah Yildirim --- docs/operations/metrics/metrics.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/operations/metrics/metrics.md b/docs/operations/metrics/metrics.md index 4be52a9632..fc2202c9b0 100644 --- a/docs/operations/metrics/metrics.md +++ b/docs/operations/metrics/metrics.md @@ -154,6 +154,18 @@ These metrics are emitted by pipeline vertex pods for NATS JetStream Inter-Step | `isb_jetstream_write_time_total` | Histogram | `buffer=` | Processing times of JetStream write operations, in microseconds | | `isb_jetstream_ack_time_total` | Histogram | `buffer=` | Processing times of JetStream ack operations, in microseconds | +## SQS Metrics + +These metrics are emitted when using SQS as a sink. + +### Producer Metrics + +| Metric name | Metric type | Labels | Description | +|------------------------------------------|-------------|----------------------------|----------------------------------------------------------------| +| `sqs_producer_publish_success_total` | Counter | `queue_name=` | Total number of messages successfully published to SQS | +| `sqs_producer_publish_failure_total` | Counter | `queue_name=` | Total number of messages that failed to publish to SQS | +| `sqs_producer_publish_latency` | Histogram | `queue_name=` | Latency of SQS publish operations, in microseconds | + ## Controller Metrics These metrics are emitted by the Numaflow controller. From 7037db92abf7e70771d93c96aa483a3f282053cb Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Sun, 15 Mar 2026 21:17:37 -0700 Subject: [PATCH 6/8] update metrics docs Signed-off-by: Abdullah Yildirim --- docs/operations/metrics/metrics.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/operations/metrics/metrics.md b/docs/operations/metrics/metrics.md index fc2202c9b0..57bc8c3e2d 100644 --- a/docs/operations/metrics/metrics.md +++ b/docs/operations/metrics/metrics.md @@ -156,7 +156,7 @@ These metrics are emitted by pipeline vertex pods for NATS JetStream Inter-Step ## SQS Metrics -These metrics are emitted when using SQS as a sink. +These metrics are emitted when using SQS. ### Producer Metrics From 33a0932b3cdb661f85a4976121628761ea8d2aaa Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Mon, 16 Mar 2026 13:22:54 -0700 Subject: [PATCH 7/8] document queue_name field Signed-off-by: Abdullah Yildirim --- rust/extns/numaflow-sqs/src/sink.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/extns/numaflow-sqs/src/sink.rs b/rust/extns/numaflow-sqs/src/sink.rs index 70c1dc5506..f52fc5a84e 100644 --- a/rust/extns/numaflow-sqs/src/sink.rs +++ b/rust/extns/numaflow-sqs/src/sink.rs @@ -98,6 +98,7 @@ impl TryFrom for SendMessageBatchRequestEntry { pub struct SqsSink { client: Client, queue_url: &'static str, + /// The name of the SQS queue, used as a label in metrics. pub queue_name: &'static str, } From 978f62bfe6b4112443890725fa1506d62b7dcfa1 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Mon, 16 Mar 2026 14:00:44 -0700 Subject: [PATCH 8/8] refactor Signed-off-by: Abdullah Yildirim --- .../src/{metrics.rs => metrics/mod.rs} | 72 ++----------------- rust/numaflow-core/src/metrics/sqs.rs | 67 +++++++++++++++++ 2 files changed, 74 insertions(+), 65 deletions(-) rename rust/numaflow-core/src/{metrics.rs => metrics/mod.rs} (97%) create mode 100644 rust/numaflow-core/src/metrics/sqs.rs diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics/mod.rs similarity index 97% rename from rust/numaflow-core/src/metrics.rs rename to rust/numaflow-core/src/metrics/mod.rs index 5bd2e1e6e8..60a875373b 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics/mod.rs @@ -33,6 +33,9 @@ use crate::sinker::sink::SinkWriter; use crate::source::Source; use crate::watermark::WatermarkHandle; +pub(crate) mod sqs; +pub(crate) use sqs::sqs_metrics; + // SDK information const SDK_INFO: &str = "sdk_info"; const COMPONENT: &str = "component"; @@ -151,13 +154,6 @@ const JETSTREAM_ISB_READ_TIME_TOTAL: &str = "read_time_total"; const JETSTREAM_ISB_WRITE_TIME_TOTAL: &str = "write_time_total"; const JETSTREAM_ISB_ACK_TIME_TOTAL: &str = "ack_time_total"; -// SQS related metric configs -const SQS_REGISTRY_PREFIX: &str = "sqs"; -const SQS_PRODUCER_REGISTRY_PREFIX: &str = "producer"; -const SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL: &str = "publish_success"; -const SQS_PRODUCER_PUBLISH_FAILURE_TOTAL: &str = "publish_failure"; -const SQS_PRODUCER_PUBLISH_LATENCY: &str = "publish_latency"; - /// A deep healthcheck for components. Each component should implement IsReady for both builtins and /// user-defined containers. #[derive(Clone)] @@ -540,55 +536,6 @@ impl JetStreamISBMetrics { } } -pub(crate) struct SQSProducerMetrics { - pub(crate) publish_latency: Family, Histogram>, - pub(crate) publish_success: Family, Counter>, - pub(crate) publish_failure: Family, Counter>, -} - -pub(crate) struct SQSMetrics { - pub(crate) producer: SQSProducerMetrics, -} - -impl SQSMetrics { - pub(crate) fn new() -> Self { - let metrics = Self { - producer: SQSProducerMetrics { - publish_latency: Family::, Histogram>::new_with_constructor( - || Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)), - ), - publish_success: Family::, Counter>::default(), - publish_failure: Family::, Counter>::default(), - }, - }; - - let mut registry = global_registry().registry.lock(); - let sqs_registry = registry.sub_registry_with_prefix(SQS_REGISTRY_PREFIX); - let producer_registery = - sqs_registry.sub_registry_with_prefix(SQS_PRODUCER_REGISTRY_PREFIX); - - producer_registery.register( - SQS_PRODUCER_PUBLISH_LATENCY, - "Latency of SQS publish operations in microseconds", - metrics.producer.publish_latency.clone(), - ); - - producer_registery.register( - SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL, - "Number of messages successfully published to SQS", - metrics.producer.publish_success.clone(), - ); - - producer_registery.register( - SQS_PRODUCER_PUBLISH_FAILURE_TOTAL, - "Number of messages that failed to publish to SQS", - metrics.producer.publish_failure.clone(), - ); - - metrics - } -} - /// Exponential bucket distribution with range. /// Creates `length` buckets, where the lowest bucket is `min` and the highest bucket is `max`. /// The final +Inf bucket is not counted and not included in the returned iterator. @@ -788,8 +735,8 @@ impl MonoVtxMetrics { metrics.fb_sink.write_total.clone(), ); fb_sink_registry.register(FALLBACK_SINK_TIME, - "A Histogram to keep track of the total time taken to Write to the fallback sink, in microseconds", - metrics.fb_sink.time.clone()); + "A Histogram to keep track of the total time taken to Write to the fallback sink, in microseconds", + metrics.fb_sink.time.clone()); // OnSuccess Sink metrics let ons_sink_registry = registry.sub_registry_with_prefix(ON_SUCCESS_SINK_REGISTRY_PREFIX); @@ -800,8 +747,8 @@ impl MonoVtxMetrics { metrics.ons_sink.write_total.clone(), ); ons_sink_registry.register(ON_SUCCESS_SINK_TIME, - "A Histogram to keep track of the total time taken to Write to the on-success sink, in microseconds", - metrics.ons_sink.time.clone()); + "A Histogram to keep track of the total time taken to Write to the on-success sink, in microseconds", + metrics.ons_sink.time.clone()); metrics } @@ -1119,11 +1066,6 @@ pub(crate) fn pipeline_metrics() -> &'static PipelineMetrics { PIPELINE_METRICS.get_or_init(PipelineMetrics::new) } -static SQS_METRICS: OnceLock = OnceLock::new(); -pub(crate) fn sqs_metrics() -> &'static SQSMetrics { - SQS_METRICS.get_or_init(SQSMetrics::new) -} - // sdk_info_labels is a helper function used to build the labels used in sdk_info pub(crate) fn sdk_info_labels( component: String, diff --git a/rust/numaflow-core/src/metrics/sqs.rs b/rust/numaflow-core/src/metrics/sqs.rs new file mode 100644 index 0000000000..ece1c86479 --- /dev/null +++ b/rust/numaflow-core/src/metrics/sqs.rs @@ -0,0 +1,67 @@ +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::histogram::Histogram; +use std::sync::OnceLock; + +use super::{exponential_buckets_range, global_registry}; + +// SQS related metric configs +const SQS_REGISTRY_PREFIX: &str = "sqs"; +const SQS_PRODUCER_REGISTRY_PREFIX: &str = "producer"; +const SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL: &str = "publish_success"; +const SQS_PRODUCER_PUBLISH_FAILURE_TOTAL: &str = "publish_failure"; +const SQS_PRODUCER_PUBLISH_LATENCY: &str = "publish_latency"; + +pub(crate) struct SQSProducerMetrics { + pub(crate) publish_latency: Family, Histogram>, + pub(crate) publish_success: Family, Counter>, + pub(crate) publish_failure: Family, Counter>, +} + +pub(crate) struct SQSMetrics { + pub(crate) producer: SQSProducerMetrics, +} + +impl SQSMetrics { + pub(crate) fn new() -> Self { + let metrics = Self { + producer: SQSProducerMetrics { + publish_latency: Family::, Histogram>::new_with_constructor( + || Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)), + ), + publish_success: Family::, Counter>::default(), + publish_failure: Family::, Counter>::default(), + }, + }; + + let mut registry = global_registry().registry.lock(); + let sqs_registry = registry.sub_registry_with_prefix(SQS_REGISTRY_PREFIX); + let producer_registery = + sqs_registry.sub_registry_with_prefix(SQS_PRODUCER_REGISTRY_PREFIX); + + producer_registery.register( + SQS_PRODUCER_PUBLISH_LATENCY, + "Latency of SQS publish operations in microseconds", + metrics.producer.publish_latency.clone(), + ); + + producer_registery.register( + SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL, + "Number of messages successfully published to SQS", + metrics.producer.publish_success.clone(), + ); + + producer_registery.register( + SQS_PRODUCER_PUBLISH_FAILURE_TOTAL, + "Number of messages that failed to publish to SQS", + metrics.producer.publish_failure.clone(), + ); + + metrics + } +} + +static SQS_METRICS: OnceLock = OnceLock::new(); +pub(crate) fn sqs_metrics() -> &'static SQSMetrics { + SQS_METRICS.get_or_init(SQSMetrics::new) +}