Skip to content

Commit

Permalink
feat(relay): add metrics for total items spooled and their size (#4511)
Browse files Browse the repository at this point in the history
This PR extends the autoscaling endpoint by providing data about how
many items are spooled and also how much bytes they use, which is
currently only available for sqlite spooling.
  • Loading branch information
Litarnus authored Feb 26, 2025
1 parent 7eea540 commit 3fe3975
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501))
- Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500))
- Expose additional metrics through the internal relay metric endpoint. ([#4511](https://github.com/getsentry/relay/pull/4511))

## 25.2.0

Expand Down
27 changes: 19 additions & 8 deletions relay-server/src/endpoints/autoscaling.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::http::StatusCode;
use crate::service::ServiceState;
use crate::services::autoscaling::{AutoscalingData, AutoscalingMessageKind};
use std::fmt::Display;

/// Returns internal metrics data for relay.
pub async fn handle(state: ServiceState) -> (StatusCode, String) {
Expand All @@ -23,17 +24,22 @@ pub async fn handle(state: ServiceState) -> (StatusCode, String) {

/// Simple function to serialize a well-known format into a prometheus string.
fn to_prometheus_string(data: &AutoscalingData) -> String {
let mut result = String::with_capacity(32);
result.push_str("memory_usage ");
result.push_str(&data.memory_usage.to_string());
result.push('\n');
result.push_str("up ");
result.push_str(&data.up.to_string());
result.push('\n');
let mut result = String::with_capacity(128);

append_data_row(&mut result, "memory_usage", data.memory_usage);
append_data_row(&mut result, "up", data.up);
append_data_row(&mut result, "item_count", data.item_count);
append_data_row(&mut result, "total_size", data.total_size);
result
}

fn append_data_row(result: &mut String, label: &str, data: impl Display) {
result.push_str(label);
result.push(' ');
result.push_str(&data.to_string());
result.push('\n');
}

#[cfg(test)]
mod test {
use crate::services::autoscaling::AutoscalingData;
Expand All @@ -43,8 +49,13 @@ mod test {
let data = AutoscalingData {
memory_usage: 0.75,
up: 1,
item_count: 10,
total_size: 30,
};
let result = super::to_prometheus_string(&data);
assert_eq!(result, "memory_usage 0.75\nup 1\n");
assert_eq!(
result,
"memory_usage 0.75\nup 1\nitem_count 10\ntotal_size 30\n"
);
}
}
13 changes: 8 additions & 5 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct Registry {
pub envelope_buffer: PartitionedEnvelopeBuffer,

pub project_cache_handle: ProjectCacheHandle,
pub keda: Addr<AutoscalingMetrics>,
pub autoscaling: Addr<AutoscalingMetrics>,
}

/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
Expand Down Expand Up @@ -189,8 +189,6 @@ impl ServiceState {
let outcome_aggregator =
services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));

let keda = services.start(AutoscalingMetricService::new(memory_stat.clone()));

let (global_config, global_config_rx) =
GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
Expand Down Expand Up @@ -288,6 +286,11 @@ impl ServiceState {
envelope_buffer.clone(),
));

let autoscaling = services.start(AutoscalingMetricService::new(
memory_stat.clone(),
envelope_buffer.clone(),
));

services.start(RelayStats::new(
config.clone(),
handle.clone(),
Expand All @@ -312,7 +315,7 @@ impl ServiceState {
project_cache_handle,
upstream_relay,
envelope_buffer,
keda,
autoscaling,
};

let state = StateInner {
Expand All @@ -339,7 +342,7 @@ impl ServiceState {
}

pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
&self.inner.registry.keda
&self.inner.registry.autoscaling
}

/// Returns the V2 envelope buffer, if present.
Expand Down
25 changes: 16 additions & 9 deletions relay-server/src/services/autoscaling.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use crate::services::buffer::PartitionedEnvelopeBuffer;
use crate::MemoryStat;
use relay_system::{AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use serde::Serialize;

/// Service that tracks internal relay metrics so that they can be exposed.
pub struct AutoscalingMetricService {
memory_stat: MemoryStat,
envelope_buffer: PartitionedEnvelopeBuffer,
up: u8,
}

impl AutoscalingMetricService {
pub fn new(memory_stat: MemoryStat) -> Self {
Self { memory_stat, up: 1 }
pub fn new(memory_stat: MemoryStat, envelope_buffer: PartitionedEnvelopeBuffer) -> Self {
Self {
memory_stat,
envelope_buffer,
up: 1,
}
}
}

Expand All @@ -28,7 +34,12 @@ impl Service for AutoscalingMetricService {
match message {
AutoscalingMetrics::Check(sender) => {
let memory_usage = self.memory_stat.memory();
sender.send(AutoscalingData::new(memory_usage.used_percent(), self.up));
sender.send(AutoscalingData {
memory_usage: memory_usage.used_percent(),
up: self.up,
total_size: self.envelope_buffer.total_storage_size(),
item_count: self.envelope_buffer.item_count()
});
}
}
}
Expand Down Expand Up @@ -65,10 +76,6 @@ impl FromMessage<AutoscalingMessageKind> for AutoscalingMetrics {
pub struct AutoscalingData {
pub memory_usage: f32,
pub up: u8,
}

impl AutoscalingData {
pub fn new(memory_usage: f32, up: u8) -> Self {
Self { memory_usage, up }
}
pub total_size: u64,
pub item_count: u64,
}
46 changes: 35 additions & 11 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ use std::collections::BTreeSet;
use std::convert::Infallible;
use std::error::Error;
use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
use std::time::Duration;

use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -165,6 +162,24 @@ impl PolymorphicEnvelopeBuffer {
}
}

/// Returns the total number of envelopes that have been spooled since the startup. It does
/// not include the count that existed in a persistent spooler before.
pub fn item_count(&self) -> u64 {
match self {
Self::Sqlite(buffer) => buffer.tracked_count,
Self::InMemory(buffer) => buffer.tracked_count,
}
}

/// Returns the total number of bytes that the spooler storage uses or `None` if the number
/// cannot be reliably determined.
pub fn total_size(&self) -> Option<u64> {
match self {
Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
Self::InMemory(buffer) => buffer.stack_provider.total_size(),
}
}

/// Shuts down the [`PolymorphicEnvelopeBuffer`].
pub async fn shutdown(&mut self) -> bool {
// Currently, we want to flush the buffer only for disk, since the in memory implementation
Expand Down Expand Up @@ -228,7 +243,13 @@ struct EnvelopeBuffer<P: StackProvider> {
/// count might not succeed if it takes more than a set timeout. For example, if we load the
/// count of all envelopes from disk, and it takes more than the time we set, we will mark the
/// initial count as 0 and just count incoming and outgoing envelopes from the buffer.
total_count: Arc<AtomicI64>,
total_count: i64,
/// The total count of envelopes that the buffer is working with ignoring envelopes that
/// were previously stored on disk.
///
/// On startup this will always be 0 and will only count incoming envelopes. If a reliable
/// count of currently buffered envelopes is required, prefer this over `total_count`
tracked_count: u64,
/// Whether the count initialization succeeded or not.
///
/// This boolean is just used for tagging the metric that tracks the total count of envelopes
Expand All @@ -245,7 +266,8 @@ impl EnvelopeBuffer<MemoryStackProvider> {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: MemoryStackProvider::new(memory_checker),
total_count: Arc::new(AtomicI64::new(0)),
total_count: 0,
tracked_count: 0,
total_count_initialized: false,
partition_tag: partition_id.to_string(),
}
Expand All @@ -260,7 +282,8 @@ impl EnvelopeBuffer<SqliteStackProvider> {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: SqliteStackProvider::new(partition_id, config).await?,
total_count: Arc::new(AtomicI64::new(0)),
total_count: 0,
tracked_count: 0,
total_count_initialized: false,
partition_tag: partition_id.to_string(),
})
Expand Down Expand Up @@ -318,7 +341,8 @@ where
prio.received_at = received_at;
});

self.total_count.fetch_add(1, AtomicOrdering::SeqCst);
self.total_count += 1;
self.tracked_count += 1;
self.track_total_count();

Ok(())
Expand Down Expand Up @@ -385,7 +409,8 @@ where
// We are fine with the count going negative, since it represents that more data was popped,
// than it was initially counted, meaning that we had a wrong total count from
// initialization.
self.total_count.fetch_sub(1, AtomicOrdering::SeqCst);
self.total_count -= 1;
self.tracked_count = self.tracked_count.saturating_sub(1);
self.track_total_count();

Ok(Some(envelope))
Expand Down Expand Up @@ -529,8 +554,7 @@ where
.await;
match total_count {
Ok(total_count) => {
self.total_count
.store(total_count as i64, AtomicOrdering::SeqCst);
self.total_count = total_count as i64;
self.total_count_initialized = true;
}
Err(error) => {
Expand All @@ -546,7 +570,7 @@ where

/// Emits a metric to track the total count of envelopes that are in the envelope buffer.
fn track_total_count(&self) {
let total_count = self.total_count.load(AtomicOrdering::SeqCst) as f64;
let total_count = self.total_count as f64;
let initialized = match self.total_count_initialized {
true => "true",
false => "false",
Expand Down
Loading

0 comments on commit 3fe3975

Please sign in to comment.