diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a9424567a2..f702c140d2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501)) - Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500)) +- Expose additional metrics through the internal relay metric endpoint. ([#4511](https://github.com/getsentry/relay/pull/4511)) ## 25.2.0 diff --git a/relay-server/src/endpoints/autoscaling.rs b/relay-server/src/endpoints/autoscaling.rs index 6f2adf0c057..f79b0e26c41 100644 --- a/relay-server/src/endpoints/autoscaling.rs +++ b/relay-server/src/endpoints/autoscaling.rs @@ -1,6 +1,7 @@ use crate::http::StatusCode; use crate::service::ServiceState; use crate::services::autoscaling::{AutoscalingData, AutoscalingMessageKind}; +use std::fmt::Display; /// Returns internal metrics data for relay. pub async fn handle(state: ServiceState) -> (StatusCode, String) { @@ -23,17 +24,22 @@ pub async fn handle(state: ServiceState) -> (StatusCode, String) { /// Simple function to serialize a well-known format into a prometheus string. fn to_prometheus_string(data: &AutoscalingData) -> String { - let mut result = String::with_capacity(32); - result.push_str("memory_usage "); - result.push_str(&data.memory_usage.to_string()); - result.push('\n'); - result.push_str("up "); - result.push_str(&data.up.to_string()); - result.push('\n'); + let mut result = String::with_capacity(128); + append_data_row(&mut result, "memory_usage", data.memory_usage); + append_data_row(&mut result, "up", data.up); + append_data_row(&mut result, "item_count", data.item_count); + append_data_row(&mut result, "total_size", data.total_size); result } +fn append_data_row(result: &mut String, label: &str, data: impl Display) { + result.push_str(label); + result.push(' '); + result.push_str(&data.to_string()); + result.push('\n'); +} + #[cfg(test)] mod test { use crate::services::autoscaling::AutoscalingData; @@ -43,8 +49,13 @@ mod test { let data = AutoscalingData { memory_usage: 0.75, up: 1, + item_count: 10, + total_size: 30, }; let result = super::to_prometheus_string(&data); - assert_eq!(result, "memory_usage 0.75\nup 1\n"); + assert_eq!( + result, + "memory_usage 0.75\nup 1\nitem_count 10\ntotal_size 30\n" + ); } } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index a621a0d4d11..ff4f32eee7f 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -72,7 +72,7 @@ pub struct Registry { pub envelope_buffer: PartitionedEnvelopeBuffer, pub project_cache_handle: ProjectCacheHandle, - pub keda: Addr, + pub autoscaling: Addr, } /// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service). @@ -189,8 +189,6 @@ impl ServiceState { let outcome_aggregator = services.start(OutcomeAggregator::new(&config, outcome_producer.clone())); - let keda = services.start(AutoscalingMetricService::new(memory_stat.clone())); - let (global_config, global_config_rx) = GlobalConfigService::new(config.clone(), upstream_relay.clone()); let global_config_handle = global_config.handle(); @@ -288,6 +286,11 @@ impl ServiceState { envelope_buffer.clone(), )); + let autoscaling = services.start(AutoscalingMetricService::new( + memory_stat.clone(), + envelope_buffer.clone(), + )); + services.start(RelayStats::new( config.clone(), handle.clone(), @@ -312,7 +315,7 @@ impl ServiceState { project_cache_handle, upstream_relay, envelope_buffer, - keda, + autoscaling, }; let state = StateInner { @@ -339,7 +342,7 @@ impl ServiceState { } pub fn autoscaling(&self) -> &Addr { - &self.inner.registry.keda + &self.inner.registry.autoscaling } /// Returns the V2 envelope buffer, if present. diff --git a/relay-server/src/services/autoscaling.rs b/relay-server/src/services/autoscaling.rs index f45b520c402..2001ed3be8d 100644 --- a/relay-server/src/services/autoscaling.rs +++ b/relay-server/src/services/autoscaling.rs @@ -1,3 +1,4 @@ +use crate::services::buffer::PartitionedEnvelopeBuffer; use crate::MemoryStat; use relay_system::{AsyncResponse, Controller, FromMessage, Interface, Sender, Service}; use serde::Serialize; @@ -5,12 +6,17 @@ use serde::Serialize; /// Service that tracks internal relay metrics so that they can be exposed. pub struct AutoscalingMetricService { memory_stat: MemoryStat, + envelope_buffer: PartitionedEnvelopeBuffer, up: u8, } impl AutoscalingMetricService { - pub fn new(memory_stat: MemoryStat) -> Self { - Self { memory_stat, up: 1 } + pub fn new(memory_stat: MemoryStat, envelope_buffer: PartitionedEnvelopeBuffer) -> Self { + Self { + memory_stat, + envelope_buffer, + up: 1, + } } } @@ -28,7 +34,12 @@ impl Service for AutoscalingMetricService { match message { AutoscalingMetrics::Check(sender) => { let memory_usage = self.memory_stat.memory(); - sender.send(AutoscalingData::new(memory_usage.used_percent(), self.up)); + sender.send(AutoscalingData { + memory_usage: memory_usage.used_percent(), + up: self.up, + total_size: self.envelope_buffer.total_storage_size(), + item_count: self.envelope_buffer.item_count() + }); } } } @@ -65,10 +76,6 @@ impl FromMessage for AutoscalingMetrics { pub struct AutoscalingData { pub memory_usage: f32, pub up: u8, -} - -impl AutoscalingData { - pub fn new(memory_usage: f32, up: u8) -> Self { - Self { memory_usage, up } - } + pub total_size: u64, + pub item_count: u64, } diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index b7591f743b3..f02634524d7 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -3,9 +3,6 @@ use std::collections::BTreeSet; use std::convert::Infallible; use std::error::Error; use std::mem; -use std::sync::atomic::AtomicI64; -use std::sync::atomic::Ordering as AtomicOrdering; -use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; @@ -165,6 +162,24 @@ impl PolymorphicEnvelopeBuffer { } } + /// Returns the total number of envelopes that have been spooled since the startup. It does + /// not include the count that existed in a persistent spooler before. + pub fn item_count(&self) -> u64 { + match self { + Self::Sqlite(buffer) => buffer.tracked_count, + Self::InMemory(buffer) => buffer.tracked_count, + } + } + + /// Returns the total number of bytes that the spooler storage uses or `None` if the number + /// cannot be reliably determined. + pub fn total_size(&self) -> Option { + match self { + Self::Sqlite(buffer) => buffer.stack_provider.total_size(), + Self::InMemory(buffer) => buffer.stack_provider.total_size(), + } + } + /// Shuts down the [`PolymorphicEnvelopeBuffer`]. pub async fn shutdown(&mut self) -> bool { // Currently, we want to flush the buffer only for disk, since the in memory implementation @@ -228,7 +243,13 @@ struct EnvelopeBuffer { /// count might not succeed if it takes more than a set timeout. For example, if we load the /// count of all envelopes from disk, and it takes more than the time we set, we will mark the /// initial count as 0 and just count incoming and outgoing envelopes from the buffer. - total_count: Arc, + total_count: i64, + /// The total count of envelopes that the buffer is working with ignoring envelopes that + /// were previously stored on disk. + /// + /// On startup this will always be 0 and will only count incoming envelopes. If a reliable + /// count of currently buffered envelopes is required, prefer this over `total_count` + tracked_count: u64, /// Whether the count initialization succeeded or not. /// /// This boolean is just used for tagging the metric that tracks the total count of envelopes @@ -245,7 +266,8 @@ impl EnvelopeBuffer { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: MemoryStackProvider::new(memory_checker), - total_count: Arc::new(AtomicI64::new(0)), + total_count: 0, + tracked_count: 0, total_count_initialized: false, partition_tag: partition_id.to_string(), } @@ -260,7 +282,8 @@ impl EnvelopeBuffer { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: SqliteStackProvider::new(partition_id, config).await?, - total_count: Arc::new(AtomicI64::new(0)), + total_count: 0, + tracked_count: 0, total_count_initialized: false, partition_tag: partition_id.to_string(), }) @@ -318,7 +341,8 @@ where prio.received_at = received_at; }); - self.total_count.fetch_add(1, AtomicOrdering::SeqCst); + self.total_count += 1; + self.tracked_count += 1; self.track_total_count(); Ok(()) @@ -385,7 +409,8 @@ where // We are fine with the count going negative, since it represents that more data was popped, // than it was initially counted, meaning that we had a wrong total count from // initialization. - self.total_count.fetch_sub(1, AtomicOrdering::SeqCst); + self.total_count -= 1; + self.tracked_count = self.tracked_count.saturating_sub(1); self.track_total_count(); Ok(Some(envelope)) @@ -529,8 +554,7 @@ where .await; match total_count { Ok(total_count) => { - self.total_count - .store(total_count as i64, AtomicOrdering::SeqCst); + self.total_count = total_count as i64; self.total_count_initialized = true; } Err(error) => { @@ -546,7 +570,7 @@ where /// Emits a metric to track the total count of envelopes that are in the envelope buffer. fn track_total_count(&self) { - let total_count = self.total_count.load(AtomicOrdering::SeqCst) as f64; + let total_count = self.total_count as f64; let initialized = match self.total_count_initialized { true => "true", false => "false", diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index b2ce523f609..30df8ea8f58 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -2,8 +2,8 @@ use std::error::Error; use std::num::NonZeroU8; -use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::Arc; use std::time::Duration; @@ -145,6 +145,17 @@ impl PartitionedEnvelopeBuffer { self.buffers.iter().all(|buffer| buffer.has_capacity()) } + pub fn item_count(&self) -> u64 { + self.buffers.iter().map(|buffer| buffer.item_count()).sum() + } + + pub fn total_storage_size(&self) -> u64 { + self.buffers + .iter() + .map(|buffer| buffer.storage_size()) + .sum() + } + /// Builds a hasher with fixed seeds for consistent partitioning across Relay instances. fn build_hasher() -> RandomState { const K0: u64 = 0xd34db33f11223344; @@ -156,6 +167,13 @@ impl PartitionedEnvelopeBuffer { } } +#[derive(Debug)] +pub struct EnvelopeBufferMetrics { + has_capacity: AtomicBool, + item_count: AtomicU64, + storage_size: AtomicU64, +} + /// Contains the services [`Addr`] and a watch channel to observe its state. /// /// This allows outside observers to check the capacity without having to send a message. @@ -165,7 +183,7 @@ impl PartitionedEnvelopeBuffer { #[derive(Debug, Clone)] pub struct ObservableEnvelopeBuffer { addr: Addr, - has_capacity: Arc, + metrics: Arc, } impl ObservableEnvelopeBuffer { @@ -176,7 +194,15 @@ impl ObservableEnvelopeBuffer { /// Returns `true` if the buffer has the capacity to accept more elements. pub fn has_capacity(&self) -> bool { - self.has_capacity.load(Ordering::Relaxed) + self.metrics.has_capacity.load(Ordering::Relaxed) + } + + pub fn item_count(&self) -> u64 { + self.metrics.item_count.load(Ordering::Relaxed) + } + + pub fn storage_size(&self) -> u64 { + self.metrics.storage_size.load(Ordering::Relaxed) } } @@ -199,7 +225,7 @@ pub struct EnvelopeBufferService { memory_stat: MemoryStat, global_config_rx: watch::Receiver, services: Services, - has_capacity: Arc, + metrics: Arc, sleep: Duration, } @@ -224,18 +250,22 @@ impl EnvelopeBufferService { memory_stat, global_config_rx, services, - has_capacity: Arc::new(AtomicBool::new(true)), + metrics: Arc::new(EnvelopeBufferMetrics { + has_capacity: AtomicBool::new(true), + item_count: AtomicU64::new(0), + storage_size: AtomicU64::new(0), + }), sleep: Duration::ZERO, } } - /// Returns both the [`Addr`] to this service, and a reference to the capacity flag. + /// Returns both the [`Addr`] to this service, and references to spooler metrics. pub fn start_in(self, services: &dyn ServiceSpawn) -> ObservableEnvelopeBuffer { - let has_capacity = self.has_capacity.clone(); + let metrics = self.metrics.clone(); let addr = services.start(self); - ObservableEnvelopeBuffer { addr, has_capacity } + ObservableEnvelopeBuffer { addr, metrics } } /// Wait for the configured amount of time and make sure the project cache is ready to receive. @@ -515,8 +545,15 @@ impl EnvelopeBufferService { } fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { - self.has_capacity + self.metrics + .has_capacity .store(buffer.has_capacity(), Ordering::Relaxed); + self.metrics + .storage_size + .store(buffer.total_size().unwrap_or(0), Ordering::Relaxed); + self.metrics + .item_count + .store(buffer.item_count(), Ordering::Relaxed); } } @@ -626,8 +663,22 @@ impl Service for EnvelopeBufferService { } } +/// The spooler uses internal time based mechanics and to not make the tests actually wait +/// it's good to use `#[tokio::test(start_paused = true)]`. For memory based spooling, this will +/// just work. +/// +/// However, testing the sqlite spooler will not behave correctly when using `start_paused` +/// because the sqlite pool uses the timeout provided by tokio for connection establishing but +/// the work that happens during connection will run outside of tokio in its own threadpool. +/// During connection the tokio runtime will have no work, triggering the [auto advance](https://docs.rs/tokio/latest/tokio/time/fn.pause.html#auto-advance) +/// feature of the runtime, which causes the timeout to resolve immediately, preventing +/// the connection to be established (`SqliteStore(SqlxSetupFailed(PoolTimedOut))`). +/// +/// To test sqlite based spooling it is necessary to manually pause the time using +/// `tokio::time::pause` *after* the connection is established. #[cfg(test)] mod tests { + use super::*; use crate::services::projects::project::{ProjectInfo, ProjectState}; use crate::testutils::new_envelope; use crate::MemoryStat; @@ -640,8 +691,6 @@ mod tests { use tokio::sync::mpsc; use uuid::Uuid; - use super::*; - struct EnvelopeBufferServiceResult { service: EnvelopeBufferService, global_tx: watch::Sender, @@ -698,10 +747,10 @@ mod tests { outcome_aggregator_rx: _outcome_aggregator_rx, } = envelope_buffer_service(None, global_config::Status::Pending); - service.has_capacity.store(false, Ordering::Relaxed); + service.metrics.has_capacity.store(false, Ordering::Relaxed); - let ObservableEnvelopeBuffer { has_capacity, .. } = service.start_in(&TokioServiceSpawn); - assert!(!has_capacity.load(Ordering::Relaxed)); + let ObservableEnvelopeBuffer { metrics, .. } = service.start_in(&TokioServiceSpawn); + assert!(!metrics.has_capacity.load(Ordering::Relaxed)); tokio::time::advance(Duration::from_millis(100)).await; @@ -710,7 +759,7 @@ mod tests { tokio::time::advance(Duration::from_millis(100)).await; - assert!(has_capacity.load(Ordering::Relaxed)); + assert!(metrics.has_capacity.load(Ordering::Relaxed)); } #[tokio::test(start_paused = true)] @@ -790,7 +839,7 @@ mod tests { assert_eq!(envelope_processor_rx.len(), 0); } - #[tokio::test(start_paused = true)] + #[tokio::test] async fn pop_requires_memory_capacity() { let EnvelopeBufferServiceResult { service, @@ -814,6 +863,8 @@ mod tests { ); let addr = service.start_detached(); + tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::pause(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -825,6 +876,41 @@ mod tests { assert_eq!(envelope_processor_rx.len(), 0); } + #[tokio::test] + async fn test_sqlite_metrics() { + let EnvelopeBufferServiceResult { + service, + envelope_processor_rx: _envelope_processor_rx, + project_cache_handle: _project_cache_handle, + outcome_aggregator_rx: _outcome_aggregator_rx, + global_tx: _global_tx, + } = envelope_buffer_service( + Some(serde_json::json!({ + "spool": { + "envelopes": { + "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), + } + } + })), + global_config::Status::Pending, + ); + + let addr = service.start_in(&TokioServiceSpawn); + tokio::time::sleep(Duration::from_millis(200)).await; + tokio::time::pause(); + + assert_eq!(addr.metrics.item_count.load(Ordering::Relaxed), 0); + + for _ in 0..10 { + let envelope = new_envelope(false, "foo"); + addr.addr().send(EnvelopeBuffer::Push(envelope.clone())); + } + + tokio::time::sleep(Duration::from_millis(1000)).await; + + assert_eq!(addr.metrics.item_count.load(Ordering::Relaxed), 10); + } + #[tokio::test(start_paused = true)] async fn old_envelope_is_dropped() { let EnvelopeBufferServiceResult { diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index e206e15fab3..514d38a11cd 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -39,6 +39,11 @@ impl StackProvider for MemoryStackProvider { 0 } + fn total_size(&self) -> Option { + // We can't reliably tell how much memory is used so just return None. + None + } + fn stack_type<'a>(&self) -> &'a str { "memory" } diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index 715d70c436c..753a6469b83 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -58,6 +58,10 @@ pub trait StackProvider: std::fmt::Debug { /// Returns the total count of the store used by this [`StackProvider`]. fn store_total_count(&self) -> impl Future; + /// Returns the number of bytes the storage occupies. Will return `None` if no + /// reliable information can be provided. + fn total_size(&self) -> Option; + /// Returns the string representation of the stack type offered by this [`StackProvider`]. fn stack_type<'a>(&self) -> &'a str; diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 51a548d6de1..ac9e0aa585d 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -96,6 +96,10 @@ impl StackProvider for SqliteStackProvider { }) } + fn total_size(&self) -> Option { + Some(self.envelope_store.usage()) + } + fn stack_type<'a>(&self) -> &'a str { "sqlite" } diff --git a/tests/integration/fixtures/relay.py b/tests/integration/fixtures/relay.py index bc335f39bde..35488d43811 100644 --- a/tests/integration/fixtures/relay.py +++ b/tests/integration/fixtures/relay.py @@ -58,6 +58,9 @@ def shutdown(self, sig=signal.SIGKILL): self.process.kill() raise + def send_signal(self, signal): + self.process.send_signal(signal) + @pytest.fixture def get_relay_binary(): diff --git a/tests/integration/test_autoscaling.py b/tests/integration/test_autoscaling.py index 0aad6b3199c..e7e5ee9fbe8 100644 --- a/tests/integration/test_autoscaling.py +++ b/tests/integration/test_autoscaling.py @@ -1,10 +1,74 @@ """ -Tests the keda endpoint. +Tests the autoscaling endpoint. """ +import os +import signal +import tempfile +from time import sleep -def test_basic_keda(mini_sentry, relay): + +def parse_prometheus(input_string): + result = {} + for line in input_string.splitlines(): + parts = line.split(" ") + result[parts[0]] = parts[1] + return result + + +def test_basic_autoscaling_endpoint(mini_sentry, relay): relay = relay(mini_sentry) + response = relay.get("/api/relay/autoscaling/") + parsed = parse_prometheus(response.text) + assert response.status_code == 200 + assert int(parsed["up"]) == 1 + + +def test_sqlite_spooling_metrics(mini_sentry, relay): + # Create a temporary directory for the sqlite db + db_file_path = os.path.join(tempfile.mkdtemp(), "database.db") + + project_id = 42 + mini_sentry.add_basic_project_config(project_id) + + relay = relay( + mini_sentry, + { + "spool": {"batch_size_bytes": 0, "envelopes": {"path": db_file_path}}, + }, + ) + + # Send SIGUSR1 to disable unspooling + relay.send_signal(signal.SIGUSR1) + sleep(0.5) # Give time for the signal to be processed + + # Send more events while unspooling is disabled + for i in range(200): + relay.send_event(project_id) + + response = relay.get("/api/relay/autoscaling/") + assert response.status_code == 200 + body = parse_prometheus(response.text) + assert int(body["item_count"]) == 200 + assert int(body["up"]) == 1 + assert int(body["total_size"]) > 30000 + + +def test_memory_spooling_metrics(mini_sentry, relay): + project_id = 42 + mini_sentry.add_basic_project_config(project_id) + + relay = relay(mini_sentry) + + relay.send_signal(signal.SIGUSR1) + sleep(0.5) + + for i in range(200): + relay.send_event(project_id) + response = relay.get("/api/relay/autoscaling/") assert response.status_code == 200 - assert "up 1" in response.text + body = parse_prometheus(response.text) + assert int(body["item_count"]) == 200 + assert int(body["up"]) == 1 + assert int(body["total_size"]) == 0