Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/operations/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ These metrics are emitted by pipeline vertex pods for NATS JetStream Inter-Step
| `isb_jetstream_write_time_total` | Histogram | `buffer=<buffer-name>` | Processing times of JetStream write operations, in microseconds |
| `isb_jetstream_ack_time_total` | Histogram | `buffer=<buffer-name>` | Processing times of JetStream ack operations, in microseconds |

## SQS Metrics

These metrics are emitted when using SQS.

### Producer Metrics

| Metric name | Metric type | Labels | Description |
|------------------------------------------|-------------|----------------------------|----------------------------------------------------------------|
| `sqs_producer_publish_success_total` | Counter | `queue_name=<queue-name>` | Total number of messages successfully published to SQS |
| `sqs_producer_publish_failure_total` | Counter | `queue_name=<queue-name>` | Total number of messages that failed to publish to SQS |
| `sqs_producer_publish_latency` | Histogram | `queue_name=<queue-name>` | Latency of SQS publish operations, in microseconds |

## Controller Metrics

These metrics are emitted by the Numaflow controller.
Expand Down
5 changes: 5 additions & 0 deletions rust/extns/numaflow-sqs/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ impl TryFrom<BatchEntryInput> for SendMessageBatchRequestEntry {
pub struct SqsSink {
client: Client,
queue_url: &'static str,
/// The name of the SQS queue, used as a label in metrics.
pub queue_name: &'static str,
}

/// Builder for creating and configuring an SQS sink.
Expand Down Expand Up @@ -195,6 +197,7 @@ impl SqsSinkBuilder {
Ok(SqsSink {
client: sqs_client.clone(),
queue_url: Box::leak(queue_url.clone().to_string().into_boxed_str()),
queue_name: self.config.queue_name,
})
}
}
Expand Down Expand Up @@ -340,6 +343,8 @@ mod tests {
sink.queue_url,
"https://sqs.us-west-2.amazonaws.com/926113353675/test-q/"
);

assert_eq!(sink.queue_name, "test-q");
}

#[test(tokio::test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use crate::sinker::sink::SinkWriter;
use crate::source::Source;
use crate::watermark::WatermarkHandle;

pub(crate) mod sqs;
pub(crate) use sqs::sqs_metrics;

// SDK information
const SDK_INFO: &str = "sdk_info";
const COMPONENT: &str = "component";
Expand Down Expand Up @@ -732,8 +735,8 @@ impl MonoVtxMetrics {
metrics.fb_sink.write_total.clone(),
);
fb_sink_registry.register(FALLBACK_SINK_TIME,
"A Histogram to keep track of the total time taken to Write to the fallback sink, in microseconds",
metrics.fb_sink.time.clone());
"A Histogram to keep track of the total time taken to Write to the fallback sink, in microseconds",
metrics.fb_sink.time.clone());

// OnSuccess Sink metrics
let ons_sink_registry = registry.sub_registry_with_prefix(ON_SUCCESS_SINK_REGISTRY_PREFIX);
Expand All @@ -744,8 +747,8 @@ impl MonoVtxMetrics {
metrics.ons_sink.write_total.clone(),
);
ons_sink_registry.register(ON_SUCCESS_SINK_TIME,
"A Histogram to keep track of the total time taken to Write to the on-success sink, in microseconds",
metrics.ons_sink.time.clone());
"A Histogram to keep track of the total time taken to Write to the on-success sink, in microseconds",
metrics.ons_sink.time.clone());

metrics
}
Expand Down Expand Up @@ -1912,6 +1915,24 @@ mod tests {
.write_error_total
.get_or_create(&jetstream_isb_error_labels)
.inc();
// populate sqs metrics
let sqs_metrics = sqs_metrics();
let sqs_labels = vec![("queue_name".to_string(), "test-queue".to_string())];
sqs_metrics
.producer
.publish_success
.get_or_create(&sqs_labels)
.inc_by(5);
sqs_metrics
.producer
.publish_failure
.get_or_create(&sqs_labels)
.inc_by(2);
sqs_metrics
.producer
.publish_latency
.get_or_create(&sqs_labels)
.observe(1500.0);

// Validate the metric names
let state = global_registry().registry.lock();
Expand Down Expand Up @@ -1956,6 +1977,11 @@ mod tests {
r#"isb_jetstream_write_error_total{buffer="test_jetstream_isb",reason="test_error"} 1"#,
r#"isb_jetstream_buffer_soft_usage{buffer="test_jetstream_isb"} 0.22"#,
r#"isb_jetstream_buffer_pending{buffer="test_jetstream_isb"} 5"#,
r#"sqs_producer_publish_success_total{queue_name="test-queue"} 5"#,
r#"sqs_producer_publish_failure_total{queue_name="test-queue"} 2"#,
r#"sqs_producer_publish_latency_sum{queue_name="test-queue"} 1500.0"#,
r#"sqs_producer_publish_latency_count{queue_name="test-queue"} 1"#,
r#"sqs_producer_publish_latency_bucket{le="100.0",queue_name="test-queue"} 0"#,
];

let got = buffer
Expand All @@ -1966,7 +1992,7 @@ mod tests {
.join("\n");

for t in expected {
assert!(got.contains(t));
assert!(got.contains(t), "expected metric not found: {}", t);
}
}
}
67 changes: 67 additions & 0 deletions rust/numaflow-core/src/metrics/sqs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::Histogram;
use std::sync::OnceLock;

use super::{exponential_buckets_range, global_registry};

// SQS related metric configs
const SQS_REGISTRY_PREFIX: &str = "sqs";
const SQS_PRODUCER_REGISTRY_PREFIX: &str = "producer";
const SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL: &str = "publish_success";
const SQS_PRODUCER_PUBLISH_FAILURE_TOTAL: &str = "publish_failure";
const SQS_PRODUCER_PUBLISH_LATENCY: &str = "publish_latency";

pub(crate) struct SQSProducerMetrics {
pub(crate) publish_latency: Family<Vec<(String, String)>, Histogram>,
pub(crate) publish_success: Family<Vec<(String, String)>, Counter>,
pub(crate) publish_failure: Family<Vec<(String, String)>, Counter>,
}

pub(crate) struct SQSMetrics {
pub(crate) producer: SQSProducerMetrics,
}

impl SQSMetrics {
pub(crate) fn new() -> Self {
let metrics = Self {
producer: SQSProducerMetrics {
publish_latency: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
|| Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)),
),
publish_success: Family::<Vec<(String, String)>, Counter>::default(),
publish_failure: Family::<Vec<(String, String)>, Counter>::default(),
},
};

let mut registry = global_registry().registry.lock();
let sqs_registry = registry.sub_registry_with_prefix(SQS_REGISTRY_PREFIX);
let producer_registery =
sqs_registry.sub_registry_with_prefix(SQS_PRODUCER_REGISTRY_PREFIX);

producer_registery.register(
SQS_PRODUCER_PUBLISH_LATENCY,
"Latency of SQS publish operations in microseconds",
metrics.producer.publish_latency.clone(),
);

producer_registery.register(
SQS_PRODUCER_PUBLISH_SUCCESS_TOTAL,
"Number of messages successfully published to SQS",
metrics.producer.publish_success.clone(),
);

producer_registery.register(
SQS_PRODUCER_PUBLISH_FAILURE_TOTAL,
"Number of messages that failed to publish to SQS",
metrics.producer.publish_failure.clone(),
);

metrics
}
}

static SQS_METRICS: OnceLock<SQSMetrics> = OnceLock::new();
pub(crate) fn sqs_metrics() -> &'static SQSMetrics {
SQS_METRICS.get_or_init(SQSMetrics::new)
}
42 changes: 39 additions & 3 deletions rust/numaflow-core/src/sinker/sink/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use numaflow_sqs::{
SQS_METADATA_KEY,
sink::{SqsSink, SqsSinkMessage},
};
use std::time::Instant;

use crate::error;
use crate::error::Error;
use crate::message::Message;
use crate::metrics::sqs_metrics;
use crate::sinker::sink::{ResponseFromSink, ResponseStatusFromSink, Sink};

impl TryFrom<Message> for SqsSinkMessage {
Expand Down Expand Up @@ -61,33 +63,67 @@ impl From<numaflow_sqs::SqsSinkError> for Error {
impl Sink for SqsSink {
async fn sink(&mut self, messages: Vec<Message>) -> error::Result<Vec<ResponseFromSink>> {
let mut result = Vec::with_capacity(messages.len());
let labels = vec![("queue_name".to_string(), self.queue_name.to_string())];

let sqs_messages: Vec<SqsSinkMessage> = messages
.iter()
.map(|msg| SqsSinkMessage::try_from(msg.clone()))
.collect::<Result<Vec<_>, _>>()?;

let sqs_sink_result = self.sink_messages(sqs_messages).await;
let message_count = sqs_messages.len();

if sqs_sink_result.is_err() {
return Err(Error::from(sqs_sink_result.err().unwrap()));
let start = Instant::now();
let sqs_sink_result = self.sink_messages(sqs_messages).await;
let elapsed = start.elapsed().as_micros() as f64;

sqs_metrics()
.producer
.publish_latency
.get_or_create(&labels)
.observe(elapsed);

if let Err(e) = sqs_sink_result {
sqs_metrics()
.producer
.publish_failure
.get_or_create(&labels)
.inc_by(message_count as u64);
return Err(Error::from(e));
}

let mut success_count = 0u64;
let mut failure_count = 0u64;

for sqs_response in sqs_sink_result?.iter() {
match &sqs_response.status {
Ok(_) => {
result.push(ResponseFromSink {
id: sqs_response.id.clone(),
status: ResponseStatusFromSink::Success,
});
success_count += 1;
}
Err(err) => {
result.push(ResponseFromSink {
id: sqs_response.id.clone(),
status: ResponseStatusFromSink::Failed(err.to_string()),
});
failure_count += 1;
}
}
}

sqs_metrics()
.producer
.publish_success
.get_or_create(&labels)
.inc_by(success_count);
sqs_metrics()
.producer
.publish_failure
.get_or_create(&labels)
.inc_by(failure_count);

Ok(result)
}
}
Expand Down
Loading