From 63a8b099e18a19e16c62e34c13bf634077e87bab Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 9 Oct 2024 18:42:59 +0200 Subject: [PATCH] revert: Spooler changes (#4131) --- relay-server/benches/benches.rs | 78 +- relay-server/src/lib.rs | 15 +- relay-server/src/services/buffer/common.rs | 34 - .../services/buffer/envelope_buffer/mod.rs | 550 +++++++++----- .../buffer/envelope_repository/memory.rs | 75 -- .../buffer/envelope_repository/mod.rs | 136 ---- .../buffer/envelope_repository/sqlite.rs | 681 ------------------ .../services/buffer/envelope_stack/memory.rs | 35 + .../src/services/buffer/envelope_stack/mod.rs | 26 + .../services/buffer/envelope_stack/sqlite.rs | 496 +++++++++++++ relay-server/src/services/buffer/mod.rs | 64 +- .../services/buffer/stack_provider/memory.rs | 52 ++ .../src/services/buffer/stack_provider/mod.rs | 69 ++ .../services/buffer/stack_provider/sqlite.rs | 206 ++++++ relay-server/src/services/buffer/testutils.rs | 24 +- relay-server/src/statsd.rs | 14 +- tests/integration/test_healthchecks.py | 2 +- 17 files changed, 1331 insertions(+), 1226 deletions(-) delete mode 100644 relay-server/src/services/buffer/envelope_repository/memory.rs delete mode 100644 relay-server/src/services/buffer/envelope_repository/mod.rs delete mode 100644 relay-server/src/services/buffer/envelope_repository/sqlite.rs create mode 100644 relay-server/src/services/buffer/envelope_stack/memory.rs create mode 100644 relay-server/src/services/buffer/envelope_stack/mod.rs create mode 100644 relay-server/src/services/buffer/envelope_stack/sqlite.rs create mode 100644 relay-server/src/services/buffer/stack_provider/memory.rs create mode 100644 relay-server/src/services/buffer/stack_provider/mod.rs create mode 100644 relay-server/src/services/buffer/stack_provider/sqlite.rs diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index 192aabdfb9d..045ad1bd937 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -11,8 +11,8 @@ use tokio::runtime::Runtime; use relay_base_schema::project::ProjectKey; use relay_server::{ - Envelope, EnvelopeBufferImpl, MemoryChecker, MemoryStat, ProjectKeyPair, - SqliteEnvelopeRepository, SqliteEnvelopeStore, + Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer, + SqliteEnvelopeStack, SqliteEnvelopeStore, }; fn setup_db(path: &PathBuf) -> Pool { @@ -70,7 +70,7 @@ fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box = Config::from_json_value(serde_json::json!({ - "spool": { - "disk_batch_size": disk_batch_size - } - })) - .unwrap() - .into(); - - let stack = SqliteEnvelopeRepository::new_with_store( - &config, + let stack = SqliteEnvelopeStack::new( envelope_store.clone(), + disk_batch_size, + 2, + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); let mut envelopes = Vec::with_capacity(size); @@ -125,7 +116,7 @@ fn benchmark_sqlite_envelope_repository(c: &mut Criterion) { |(mut stack, envelopes)| { runtime.block_on(async { for envelope in envelopes { - stack.push(project_key_pair, envelope).await.unwrap(); + stack.push(envelope).await.unwrap(); } }); }, @@ -143,24 +134,19 @@ fn benchmark_sqlite_envelope_repository(c: &mut Criterion) { runtime.block_on(async { reset_db(db.clone()).await; - let config: Arc = - Config::from_json_value(serde_json::json!({ - "spool": { - "disk_batch_size": disk_batch_size - } - })) - .unwrap() - .into(); - - let mut stack = SqliteEnvelopeRepository::new_with_store( - &config, + let mut stack = SqliteEnvelopeStack::new( envelope_store.clone(), + disk_batch_size, + 2, + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); // Pre-fill the stack for _ in 0..size { let envelope = mock_envelope(envelope_size); - stack.push(project_key_pair, envelope).await.unwrap(); + stack.push(envelope).await.unwrap(); } stack @@ -170,7 +156,7 @@ fn benchmark_sqlite_envelope_repository(c: &mut Criterion) { runtime.block_on(async { // Benchmark popping for _ in 0..size { - stack.pop(project_key_pair).await.unwrap(); + stack.pop().await.unwrap(); } }); }, @@ -189,17 +175,13 @@ fn benchmark_sqlite_envelope_repository(c: &mut Criterion) { reset_db(db.clone()).await; }); - let config: Arc = Config::from_json_value(serde_json::json!({ - "spool": { - "disk_batch_size": disk_batch_size - } - })) - .unwrap() - .into(); - - let stack = SqliteEnvelopeRepository::new_with_store( - &config, + let stack = SqliteEnvelopeStack::new( envelope_store.clone(), + disk_batch_size, + 2, + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); // Pre-generate envelopes @@ -214,12 +196,12 @@ fn benchmark_sqlite_envelope_repository(c: &mut Criterion) { for _ in 0..size { if rand::random::() { if let Some(envelope) = envelope_iter.next() { - stack.push(project_key_pair, envelope).await.unwrap(); + stack.push(envelope).await.unwrap(); } - } else if stack.pop(project_key_pair).await.is_err() { + } else if stack.pop().await.is_err() { // If pop fails (empty stack), push instead if let Some(envelope) = envelope_iter.next() { - stack.push(project_key_pair, envelope).await.unwrap(); + stack.push(envelope).await.unwrap(); } } } @@ -280,7 +262,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { |envelopes| { runtime.block_on(async { let mut buffer = - EnvelopeBufferImpl::from_config(&config, memory_checker.clone()) + PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()) .await .unwrap(); for envelope in envelopes.into_iter() { @@ -312,7 +294,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { |envelopes| { runtime.block_on(async { let mut buffer = - EnvelopeBufferImpl::from_config(&config, memory_checker.clone()) + PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()) .await .unwrap(); let n = envelopes.len(); @@ -335,6 +317,6 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { group.finish(); } -criterion_group!(sqlite, benchmark_sqlite_envelope_repository); +criterion_group!(sqlite, benchmark_sqlite_envelope_stack); criterion_group!(buffer, benchmark_envelope_buffer); criterion_main!(sqlite, buffer); diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 6463a8cc45d..986db2e5d4d 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -266,17 +266,12 @@ mod services; mod statsd; mod utils; -// pub for benchmarks -pub use self::envelope::Envelope; -// pub for benchmarks +pub use self::envelope::Envelope; // pub for benchmarks pub use self::services::buffer::{ - EnvelopeBufferImpl, EnvelopeRepository, ProjectKeyPair, SqliteEnvelopeRepository, - SqliteEnvelopeStore, -}; -// pub for benchmarks + EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, +}; // pub for benchmarks pub use self::services::spooler::spool_utils; -// pub for benchmarks -pub use self::utils::{MemoryChecker, MemoryStat}; +pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks #[cfg(test)] mod testutils; @@ -293,7 +288,7 @@ use crate::services::server::HttpServer; /// /// This effectively boots the entire server application. It blocks the current thread until a /// shutdown signal is received or a fatal error happens. Behavior of the server is determined by -/// the `config` passed into this function. +/// the `config` passed into this funciton. pub fn run(config: Config) -> anyhow::Result<()> { let config = Arc::new(config); relay_log::info!("relay server starting"); diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index 4834765db6b..924f8aa1c81 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -1,40 +1,15 @@ use relay_base_schema::project::ProjectKey; -use std::convert::Infallible; -use crate::services::buffer::envelope_repository::sqlite::SqliteEnvelopeRepositoryError; -use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::Envelope; -/// Error that occurs while interacting with the envelope buffer. -#[derive(Debug, thiserror::Error)] -pub enum EnvelopeBufferError { - #[error("sqlite")] - SqliteStore(#[from] SqliteEnvelopeStoreError), - - #[error("sqlite")] - SqliteRepository(#[from] SqliteEnvelopeRepositoryError), - - #[error("failed to push envelope to the buffer")] - PushFailed, -} - -impl From for EnvelopeBufferError { - fn from(value: Infallible) -> Self { - match value {} - } -} - /// Struct that represents two project keys. #[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)] pub struct ProjectKeyPair { - /// [`ProjectKey`] of the project of the envelope. pub own_key: ProjectKey, - /// [`ProjectKey`] of the root project of the trace to which the envelope belongs. pub sampling_key: ProjectKey, } impl ProjectKeyPair { - /// Creates a new [`ProjectKeyPair`] with the given `own_key` and `sampling_key`. pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { Self { own_key, @@ -42,21 +17,12 @@ impl ProjectKeyPair { } } - /// Creates a [`ProjectKeyPair`] from an [`Envelope`]. - /// - /// The `own_key` is set to the public key from the envelope's metadata. - /// The `sampling_key` is set to the envelope's sampling key if present, - /// otherwise it defaults to the `own_key`. pub fn from_envelope(envelope: &Envelope) -> Self { let own_key = envelope.meta().public_key(); let sampling_key = envelope.sampling_key().unwrap_or(own_key); Self::new(own_key, sampling_key) } - /// Returns an iterator over the project keys. - /// - /// The iterator always yields the `own_key` and yields the `sampling_key` - /// only if it's different from the `own_key`. pub fn iter(&self) -> impl Iterator { let Self { own_key, diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 3696dc0ec2b..0ff0bc2b7ec 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -1,34 +1,195 @@ use std::cmp::Ordering; use std::collections::BTreeSet; +use std::convert::Infallible; use std::error::Error; +use std::mem; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::Arc; use std::time::Duration; -use crate::envelope::Envelope; -use crate::services::buffer::common::{EnvelopeBufferError, ProjectKeyPair}; -use crate::services::buffer::envelope_repository::EnvelopeRepository; -use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::MemoryChecker; -use hashbrown::{HashMap, HashSet}; -use priority_queue::PriorityQueue; +use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::Config; use tokio::time::{timeout, Instant}; +use crate::envelope::Envelope; +use crate::services::buffer::common::ProjectKeyPair; +use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; +use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; +use crate::services::buffer::stack_provider::memory::MemoryStackProvider; +use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; +use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; +use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; +use crate::utils::MemoryChecker; + +/// Polymorphic envelope buffering interface. +/// +/// The underlying buffer can either be disk-based or memory-based, +/// depending on the given configuration. +/// +/// NOTE: This is implemented as an enum because a trait object with async methods would not be +/// object safe. +#[derive(Debug)] +#[allow(private_interfaces)] +pub enum PolymorphicEnvelopeBuffer { + /// An enveloper buffer that uses in-memory envelopes stacks. + InMemory(EnvelopeBuffer), + /// An enveloper buffer that uses sqlite envelopes stacks. + Sqlite(EnvelopeBuffer), +} + +impl PolymorphicEnvelopeBuffer { + /// Returns true if the implementation stores all envelopes in RAM. + pub fn is_memory(&self) -> bool { + match self { + PolymorphicEnvelopeBuffer::InMemory(_) => true, + PolymorphicEnvelopeBuffer::Sqlite(_) => false, + } + } + + /// Creates either a memory-based or a disk-based envelope buffer, + /// depending on the given configuration. + pub async fn from_config( + config: &Config, + memory_checker: MemoryChecker, + ) -> Result { + let buffer = if config.spool_envelopes_path().is_some() { + relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer"); + let buffer = EnvelopeBuffer::::new(config).await?; + Self::Sqlite(buffer) + } else { + relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer"); + let buffer = EnvelopeBuffer::::new(memory_checker); + Self::InMemory(buffer) + }; + + Ok(buffer) + } + + /// Initializes the envelope buffer. + pub async fn initialize(&mut self) { + match self { + PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await, + PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await, + } + } + + /// Adds an envelope to the buffer. + pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { + relay_statsd::metric!(timer(RelayTimers::BufferPush), { + match self { + Self::Sqlite(buffer) => buffer.push(envelope).await, + Self::InMemory(buffer) => buffer.push(envelope).await, + }?; + }); + relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); + Ok(()) + } + + /// Returns a reference to the next-in-line envelope. + pub async fn peek(&mut self) -> Result { + relay_statsd::metric!(timer(RelayTimers::BufferPeek), { + match self { + Self::Sqlite(buffer) => buffer.peek().await, + Self::InMemory(buffer) => buffer.peek().await, + } + }) + } + + /// Pops the next-in-line envelope. + pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { + let envelope = relay_statsd::metric!(timer(RelayTimers::BufferPop), { + match self { + Self::Sqlite(buffer) => buffer.pop().await, + Self::InMemory(buffer) => buffer.pop().await, + }? + }); + relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); + Ok(envelope) + } + + /// Marks a project as ready or not ready. + /// + /// The buffer re-prioritizes its envelopes based on this information. + /// Returns `true` if at least one priority was changed. + pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { + match self { + Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), + Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), + } + } + + /// Marks a stack as seen. + /// + /// Non-ready stacks are deprioritized when they are marked as seen, such that + /// the next call to `.peek()` will look at a different stack. This prevents + /// head-of-line blocking. + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) { + match self { + Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch), + Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch), + } + } + + /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. + pub fn has_capacity(&self) -> bool { + match self { + Self::Sqlite(buffer) => buffer.has_capacity(), + Self::InMemory(buffer) => buffer.has_capacity(), + } + } + + /// Shuts down the [`PolymorphicEnvelopeBuffer`]. + pub async fn shutdown(&mut self) -> bool { + // Currently, we want to flush the buffer only for disk, since the in memory implementation + // tries to not do anything and pop as many elements as possible within the shutdown + // timeout. + let Self::Sqlite(buffer) = self else { + relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed"); + return false; + }; + buffer.flush().await; + + true + } +} + +/// Error that occurs while interacting with the envelope buffer. +#[derive(Debug, thiserror::Error)] +pub enum EnvelopeBufferError { + #[error("sqlite")] + SqliteStore(#[from] SqliteEnvelopeStoreError), + + #[error("sqlite")] + SqliteStack(#[from] SqliteEnvelopeStackError), + + #[error("failed to push envelope to the buffer")] + PushFailed, +} + +impl From for EnvelopeBufferError { + fn from(value: Infallible) -> Self { + match value {} + } +} + /// An envelope buffer that holds an individual stack for each project/sampling project combination. /// /// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope /// is pushed, popped, or when a project becomes ready. #[derive(Debug)] -pub struct EnvelopeBuffer { +struct EnvelopeBuffer { /// The central priority queue. - priority_queue: PriorityQueue, - /// A lookup table to find all project key pairs for a given project. - project_to_pairs: HashMap>, - /// Repository of envelopes that can provide envelopes via different implementations. - envelope_repository: EnvelopeRepository, + priority_queue: priority_queue::PriorityQueue, Priority>, + /// A lookup table to find all stacks involving a project. + stacks_by_project: hashbrown::HashMap>, + /// A provider of stacks that provides utilities to create stacks, check their capacity... + /// + /// This indirection is needed because different stack implementations might need different + /// initialization (e.g. a database connection). + stack_provider: P, /// The total count of envelopes that the buffer is working with. /// /// Note that this count is not meant to be perfectly accurate since the initialization of the @@ -43,147 +204,155 @@ pub struct EnvelopeBuffer { total_count_initialized: bool, } -impl EnvelopeBuffer { - /// Creates either a memory-based or a disk-based envelope buffer, - /// depending on the given configuration. - pub async fn from_config( - config: &Config, - memory_checker: MemoryChecker, - ) -> Result { - let buffer = if config.spool_envelopes_path().is_some() { - relay_log::trace!("EnvelopeBuffer: initializing sqlite envelope buffer"); - Self { - project_to_pairs: Default::default(), - priority_queue: Default::default(), - envelope_repository: EnvelopeRepository::sqlite(config).await?, - total_count: Arc::new(AtomicI64::new(0)), - total_count_initialized: false, - } - } else { - relay_log::trace!("EnvelopeBuffer: initializing memory envelope buffer"); - Self { - project_to_pairs: Default::default(), - priority_queue: Default::default(), - envelope_repository: EnvelopeRepository::memory(memory_checker)?, - total_count: Arc::new(AtomicI64::new(0)), - total_count_initialized: false, - } - }; +impl EnvelopeBuffer { + /// Creates an empty memory-based buffer. + pub fn new(memory_checker: MemoryChecker) -> Self { + Self { + stacks_by_project: Default::default(), + priority_queue: Default::default(), + stack_provider: MemoryStackProvider::new(memory_checker), + total_count: Arc::new(AtomicI64::new(0)), + total_count_initialized: false, + } + } +} - Ok(buffer) +#[allow(dead_code)] +impl EnvelopeBuffer { + /// Creates an empty sqlite-based buffer. + pub async fn new(config: &Config) -> Result { + Ok(Self { + stacks_by_project: Default::default(), + priority_queue: Default::default(), + stack_provider: SqliteStackProvider::new(config).await?, + total_count: Arc::new(AtomicI64::new(0)), + total_count_initialized: false, + }) } +} +impl EnvelopeBuffer

+where + EnvelopeBufferError: From<::Error>, +{ /// Initializes the [`EnvelopeBuffer`] given the initialization state from the - /// [`EnvelopeRepository`]. + /// [`StackProvider`]. pub async fn initialize(&mut self) { relay_statsd::metric!(timer(RelayTimers::BufferInitialization), { - let initialization_state = self.envelope_repository.initialize().await; - self.load_project_key_pairs(initialization_state.project_key_pairs) + let initialization_state = self.stack_provider.initialize().await; + self.load_stacks(initialization_state.project_key_pairs) .await; self.load_store_total_count().await; }); } - /// Pushes an envelope to the [`EnvelopeRepository`] and updates the priority queue accordingly. + /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack. + /// + /// If the envelope stack does not exist, a new stack is pushed to the priority queue. + /// The priority of the stack is updated with the envelope's received_at time. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { - relay_statsd::metric!(timer(RelayTimers::BufferPush), { - let received_at = envelope.meta().start_time().into(); - let project_key_pair = ProjectKeyPair::from_envelope(&envelope); - - // If we haven't seen this project key pair, we will add it to the priority queue, otherwise - // we just update its priority. - if self.priority_queue.get_mut(&project_key_pair).is_none() { - self.add(project_key_pair, Some(envelope.as_ref())) - } else { - self.priority_queue - .change_priority_by(&project_key_pair, |prio| { - prio.received_at = received_at; - }); - } - - self.envelope_repository - .push(project_key_pair, envelope) - .await?; + let received_at = envelope.meta().start_time().into(); + let project_key_pair = ProjectKeyPair::from_envelope(&envelope); + if let Some(( + QueueItem { + key: _, + value: stack, + }, + _, + )) = self.priority_queue.get_mut(&project_key_pair) + { + stack.push(envelope).await?; + } else { + // Since we have initialization code that creates all the necessary stacks, we assume + // that any new stack that is added during the envelope buffer's lifecycle, is recreated. + self.push_stack( + StackCreationType::New, + ProjectKeyPair::from_envelope(&envelope), + Some(envelope), + ) + .await?; + } + self.priority_queue + .change_priority_by(&project_key_pair, |prio| { + prio.received_at = received_at; + }); - self.total_count.fetch_add(1, AtomicOrdering::SeqCst); - self.track_total_count(); + self.total_count.fetch_add(1, AtomicOrdering::SeqCst); + self.track_total_count(); - Ok(()) - }) + Ok(()) } /// Returns a reference to the next-in-line envelope, if one exists. pub async fn peek(&mut self) -> Result { - relay_statsd::metric!(timer(RelayTimers::BufferPeek), { - let Some((&project_key_pair, priority)) = self.priority_queue.peek() else { - return Ok(Peek::Empty); - }; + let Some(( + QueueItem { + key: stack_key, + value: stack, + }, + Priority { + readiness, + next_project_fetch, + .. + }, + )) = self.priority_queue.peek_mut() + else { + return Ok(Peek::Empty); + }; - let envelope = self.envelope_repository.peek(project_key_pair).await?; + let ready = readiness.ready(); - Ok(match (envelope, priority.readiness.ready()) { - (None, _) => Peek::Empty, - (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => { - Peek::NotReady(project_key_pair, priority.next_project_fetch, envelope) - } - }) + Ok(match (stack.peek().await?, ready) { + (None, _) => Peek::Empty, + (Some(envelope), true) => Peek::Ready(envelope), + (Some(envelope), false) => Peek::NotReady(*stack_key, *next_project_fetch, envelope), }) } /// Returns the next-in-line envelope, if one exists. /// - /// The priority of the [`ProjectKeyPair`] is updated with the next envelope's received_at - /// time. + /// The priority of the envelope's stack is updated with the next envelope's received_at + /// time. If the stack is empty after popping, it is removed from the priority queue. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - relay_statsd::metric!(timer(RelayTimers::BufferPop), { - let Some((&project_key_pair, _)) = self.priority_queue.peek() else { - return Ok(None); - }; - - // There must be an envelope when popping, since we call `peek` in the statement above - // and no concurrent access is performed in the meanwhile. - let envelope = self - .envelope_repository - .pop(project_key_pair) - .await? - .expect("pop returned no envelope"); - - let next_received_at = self - .envelope_repository - .peek(project_key_pair) - .await? - .map(|next_envelope| next_envelope.meta().start_time().into()); - match next_received_at { - None => { - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopeStacksPopped) += 1); - self.remove(project_key_pair); - } - Some(next_received_at) => { - self.priority_queue - .change_priority_by(&project_key_pair, |prio| { - prio.received_at = next_received_at; - }); - } + let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { + return Ok(None); + }; + let project_key_pair = *key; + let envelope = stack.pop().await.unwrap().expect("found an empty stack"); + + let next_received_at = stack + .peek() + .await? + .map(|next_envelope| next_envelope.meta().start_time().into()); + + match next_received_at { + None => { + relay_statsd::metric!(counter(RelayCounters::BufferEnvelopeStacksPopped) += 1); + self.pop_stack(project_key_pair); + } + Some(next_received_at) => { + self.priority_queue + .change_priority_by(&project_key_pair, |prio| { + prio.received_at = next_received_at; + }); } + } - // We are fine with the count going negative, since it represents that more data was popped, - // than it was initially counted, meaning that we had a wrong total count from - // initialization. - self.total_count.fetch_sub(1, AtomicOrdering::SeqCst); - self.track_total_count(); + // We are fine with the count going negative, since it represents that more data was popped, + // than it was initially counted, meaning that we had a wrong total count from + // initialization. + self.total_count.fetch_sub(1, AtomicOrdering::SeqCst); + self.track_total_count(); - Ok(Some(envelope)) - }) + Ok(Some(envelope)) } - /// Re-prioritizes all [`ProjectKeyPair`]s that involve the given project key by setting it to - /// "ready". + /// Re-prioritizes all stacks that involve the given project key by setting it to "ready". /// /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(project_key_pairs) = self.project_to_pairs.get(project) { + if let Some(project_key_pairs) = self.stacks_by_project.get(project) { for project_key_pair in project_key_pairs { self.priority_queue .change_priority_by(project_key_pair, |stack| { @@ -214,7 +383,7 @@ impl EnvelopeBuffer { changed } - /// Marks a [`ProjectKeyPair`] as seen. + /// Marks a stack as seen. /// /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents @@ -228,39 +397,47 @@ impl EnvelopeBuffer { }); } - /// Returns `true` if the underlying storage has the capacity to store more envelopes, false - /// otherwise. + /// Returns `true` if the underlying storage has the capacity to store more envelopes. pub fn has_capacity(&self) -> bool { - self.envelope_repository.has_store_capacity() + self.stack_provider.has_store_capacity() } /// Flushes the envelope buffer. - /// - /// Returns `true` in case after the flushing it is safe to destroy the buffer, `false` - /// otherwise. This is done because we want to make sure we know whether it is safe to drop the - /// [`EnvelopeBuffer`] after flushing is performed. - pub async fn flush(&mut self) -> bool { - self.envelope_repository.flush().await + pub async fn flush(&mut self) { + let priority_queue = mem::take(&mut self.priority_queue); + self.stack_provider + .flush(priority_queue.into_iter().map(|(q, _)| q.value)) + .await; } - /// Returns `true` if the [`EnvelopeBuffer`] is using an in-memory strategy, false otherwise. - pub fn is_memory(&self) -> bool { - matches!(self.envelope_repository, EnvelopeRepository::Memory(_)) - } - - /// Adds a new [`ProjectKeyPair`] to the `priority_queue` and `project_to_pairs`. - fn add(&mut self, project_key_pair: ProjectKeyPair, envelope: Option<&Envelope>) { + /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. + async fn push_stack( + &mut self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + envelope: Option>, + ) -> Result<(), EnvelopeBufferError> { let received_at = envelope .as_ref() .map_or(Instant::now(), |e| e.meta().start_time().into()); - let previous_entry = self - .priority_queue - .push(project_key_pair, Priority::new(received_at)); - debug_assert!(previous_entry.is_none()); + let mut stack = self + .stack_provider + .create_stack(stack_creation_type, project_key_pair); + if let Some(envelope) = envelope { + stack.push(envelope).await?; + } + let previous_entry = self.priority_queue.push( + QueueItem { + key: project_key_pair, + value: stack, + }, + Priority::new(received_at), + ); + debug_assert!(previous_entry.is_none()); for project_key in project_key_pair.iter() { - self.project_to_pairs + self.stacks_by_project .entry(project_key) .or_default() .insert(project_key_pair); @@ -268,12 +445,14 @@ impl EnvelopeBuffer { relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); + + Ok(()) } - /// Removes a [`ProjectKeyPair`] from the `priority_queue` and `project_to_pairs`. - fn remove(&mut self, project_key_pair: ProjectKeyPair) { + /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`]. + fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) { for project_key in project_key_pair.iter() { - self.project_to_pairs + self.stacks_by_project .get_mut(&project_key) .expect("project_key is missing from lookup") .remove(&project_key_pair); @@ -285,10 +464,12 @@ impl EnvelopeBuffer { ); } - /// Creates all the priority queue entries given the supplied [`ProjectKeyPair`]s. - async fn load_project_key_pairs(&mut self, project_key_pairs: HashSet) { + /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. + async fn load_stacks(&mut self, project_key_pairs: HashSet) { for project_key_pair in project_key_pairs { - self.add(project_key_pair, None); + self.push_stack(StackCreationType::Initialization, project_key_pair, None) + .await + .expect("Pushing an empty stack raised an error"); } } @@ -298,10 +479,9 @@ impl EnvelopeBuffer { /// will process, besides the count of elements that will be added and removed during its /// lifecycle async fn load_store_total_count(&mut self) { - let total_count = timeout( - Duration::from_secs(1), - self.envelope_repository.store_total_count(), - ) + let total_count = timeout(Duration::from_secs(1), async { + self.stack_provider.store_total_count().await + }) .await; match total_count { Ok(total_count) => { @@ -330,7 +510,7 @@ impl EnvelopeBuffer { relay_statsd::metric!( histogram(RelayHistograms::BufferEnvelopesCount) = total_count, initialized = initialized, - stack_type = &self.envelope_repository.name() + stack_type = self.stack_provider.stack_type() ); } } @@ -342,6 +522,32 @@ pub enum Peek<'a> { NotReady(ProjectKeyPair, Instant, &'a Envelope), } +#[derive(Debug)] +struct QueueItem { + key: K, + value: V, +} + +impl std::borrow::Borrow for QueueItem { + fn borrow(&self) -> &K { + &self.key + } +} + +impl std::hash::Hash for QueueItem { + fn hash(&self, state: &mut H) { + self.key.hash(state); + } +} + +impl PartialEq for QueueItem { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for QueueItem {} + #[derive(Debug, Clone)] struct Priority { readiness: Readiness, @@ -475,13 +681,7 @@ mod tests { envelope } - fn mock_config() -> Arc { - Config::from_json_value(serde_json::json!({})) - .unwrap() - .into() - } - - fn mock_config_with_path(path: &str) -> Arc { + fn mock_config(path: &str) -> Arc { Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { @@ -494,10 +694,10 @@ mod tests { } fn mock_memory_checker() -> MemoryChecker { - MemoryChecker::new(MemoryStat::default(), mock_config()) + MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone()) } - async fn peek_project_key(buffer: &mut EnvelopeBuffer) -> ProjectKey { + async fn peek_project_key(buffer: &mut EnvelopeBuffer) -> ProjectKey { buffer .peek() .await @@ -510,9 +710,7 @@ mod tests { #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) - .await - .unwrap(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -583,9 +781,7 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) - .await - .unwrap(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -612,9 +808,7 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) - .await - .unwrap(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -730,9 +924,7 @@ mod tests { assert_ne!(project_key_pair1, project_key_pair2); - let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) - .await - .unwrap(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -764,9 +956,7 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) - .await - .unwrap(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); @@ -823,9 +1013,9 @@ mod tests { .into_os_string() .into_string() .unwrap(); - let config = mock_config_with_path(&path); + let config = mock_config(&path); let mut store = SqliteEnvelopeStore::prepare(&config).await.unwrap(); - let mut buffer = EnvelopeBuffer::from_config(&config, mock_memory_checker()) + let mut buffer = EnvelopeBuffer::::new(&config) .await .unwrap(); @@ -839,7 +1029,7 @@ mod tests { // We assume that the buffer is empty. assert!(buffer.priority_queue.is_empty()); - assert!(buffer.project_to_pairs.is_empty()); + assert!(buffer.stacks_by_project.is_empty()); buffer.initialize().await; @@ -848,6 +1038,6 @@ mod tests { assert_eq!(buffer.priority_queue.len(), 1); // We expect to have an entry per project key, since we have 1 pair, the total entries // should be 2. - assert_eq!(buffer.project_to_pairs.len(), 2); + assert_eq!(buffer.stacks_by_project.len(), 2); } } diff --git a/relay-server/src/services/buffer/envelope_repository/memory.rs b/relay-server/src/services/buffer/envelope_repository/memory.rs deleted file mode 100644 index 58656935ac3..00000000000 --- a/relay-server/src/services/buffer/envelope_repository/memory.rs +++ /dev/null @@ -1,75 +0,0 @@ -use crate::services::buffer::common::ProjectKeyPair; -use crate::{Envelope, MemoryChecker}; -use hashbrown::HashMap; -use std::convert::Infallible; - -/// Provides in-memory storage for envelopes, organized by project key pairs. -#[derive(Debug)] -pub struct MemoryEnvelopeRepository { - #[allow(clippy::vec_box)] - envelopes: HashMap>>, - memory_checker: MemoryChecker, -} - -impl MemoryEnvelopeRepository { - /// Creates a new [`MemoryEnvelopeRepository`] with the given memory checker. - pub fn new(memory_checker: MemoryChecker) -> Self { - Self { - envelopes: HashMap::new(), - memory_checker, - } - } - - /// Pushes an envelope to the repository for the given project key pair. - pub async fn push( - &mut self, - project_key_pair: ProjectKeyPair, - envelope: Box, - ) -> Result<(), Infallible> { - self.envelopes - .entry(project_key_pair) - .or_default() - .push(envelope); - - Ok(()) - } - - /// Peeks at the next envelope for the given project key pair without removing it. - pub async fn peek( - &self, - project_key_pair: ProjectKeyPair, - ) -> Result, Infallible> { - Ok(self - .envelopes - .get(&project_key_pair) - .and_then(|envelopes| envelopes.last().map(|boxed| boxed.as_ref()))) - } - - /// Pops and returns the next envelope for the given project key pair. - pub async fn pop( - &mut self, - project_key_pair: ProjectKeyPair, - ) -> Result>, Infallible> { - Ok(self - .envelopes - .get_mut(&project_key_pair) - .and_then(|envelopes| envelopes.pop())) - } - - /// Attempts to flush envelopes to storage. - pub async fn flush(&mut self) -> bool { - // Only if there are no envelopes we can signal to the caller that it is safe to drop the - // buffer. - self.envelopes.is_empty() - } - - /// Checks if there is capacity to store more envelopes. - pub fn has_store_capacity(&self) -> bool { - self.memory_checker.check_memory().has_capacity() - } - - /// Retrieves the total count of envelopes in the store. - pub fn store_total_count(&self) -> u64 { - self.envelopes.values().map(|e| e.len() as u64).sum() - } -} diff --git a/relay-server/src/services/buffer/envelope_repository/mod.rs b/relay-server/src/services/buffer/envelope_repository/mod.rs deleted file mode 100644 index c1e2171e95c..00000000000 --- a/relay-server/src/services/buffer/envelope_repository/mod.rs +++ /dev/null @@ -1,136 +0,0 @@ -use crate::services::buffer::common::{EnvelopeBufferError, ProjectKeyPair}; -use crate::services::buffer::envelope_repository::memory::MemoryEnvelopeRepository; -use crate::services::buffer::envelope_repository::sqlite::SqliteEnvelopeRepository; -use crate::{Envelope, MemoryChecker}; -use hashbrown::HashSet; -use relay_config::Config; - -mod memory; -pub mod sqlite; - -/// State of the initialization of the [`EnvelopeRepository`]. -/// -/// This state is necessary for initializing resources whenever a [`EnvelopeRepository`] is used. -#[derive(Debug)] -pub struct InitializationState { - pub project_key_pairs: HashSet, -} - -impl InitializationState { - /// Create a new [`InitializationState`]. - pub fn new(project_key_pairs: HashSet) -> Self { - Self { project_key_pairs } - } - - /// Creates a new empty [`InitializationState`]. - pub fn empty() -> Self { - Self { - project_key_pairs: HashSet::new(), - } - } -} - -/// Represents different types of envelope repositories. -#[derive(Debug)] -pub enum EnvelopeRepository { - /// In-memory envelope repository. - Memory(MemoryEnvelopeRepository), - /// SQLite-based envelope repository. - SQLite(SqliteEnvelopeRepository), -} - -impl EnvelopeRepository { - /// Creates a new memory-based envelope repository. - pub fn memory(memory_checker: MemoryChecker) -> Result { - Ok(Self::Memory(MemoryEnvelopeRepository::new(memory_checker))) - } - - /// Creates a new SQLite-based envelope repository. - pub async fn sqlite(config: &Config) -> Result { - Ok(Self::SQLite(SqliteEnvelopeRepository::new(config).await?)) - } - - /// Initializes the [`EnvelopeRepository`] and returns the initialization state. - pub async fn initialize(&mut self) -> InitializationState { - match self { - EnvelopeRepository::Memory(_) => InitializationState::empty(), - EnvelopeRepository::SQLite(repository) => repository.initialize().await, - } - } - - /// Pushes an envelope to the repository for the given project key pair. - pub async fn push( - &mut self, - project_key_pair: ProjectKeyPair, - envelope: Box, - ) -> Result<(), EnvelopeBufferError> { - match self { - EnvelopeRepository::Memory(repository) => { - repository.push(project_key_pair, envelope).await? - } - EnvelopeRepository::SQLite(repository) => { - repository.push(project_key_pair, envelope).await? - } - } - - Ok(()) - } - - /// Peeks at the next envelope for the given project key pair without removing it. - pub async fn peek( - &mut self, - project_key_pair: ProjectKeyPair, - ) -> Result, EnvelopeBufferError> { - let envelope = match self { - EnvelopeRepository::Memory(repository) => repository.peek(project_key_pair).await?, - EnvelopeRepository::SQLite(repository) => repository.peek(project_key_pair).await?, - }; - - Ok(envelope) - } - - /// Pops and returns the next envelope for the given project key pair. - pub async fn pop( - &mut self, - project_key_pair: ProjectKeyPair, - ) -> Result>, EnvelopeBufferError> { - let envelope = match self { - EnvelopeRepository::Memory(repository) => repository.pop(project_key_pair).await?, - EnvelopeRepository::SQLite(repository) => repository.pop(project_key_pair).await?, - }; - - Ok(envelope) - } - - /// Flushes the [`Envelope`]s in the [`EnvelopeRepository`]. - pub async fn flush(&mut self) -> bool { - match self { - EnvelopeRepository::Memory(repository) => repository.flush().await, - EnvelopeRepository::SQLite(repository) => repository.flush().await, - } - } - - /// Returns `true` when there is space to store more [`Envelope`]s, `false` otherwise. - pub fn has_store_capacity(&self) -> bool { - match self { - EnvelopeRepository::Memory(repository) => repository.has_store_capacity(), - EnvelopeRepository::SQLite(repository) => repository.has_store_capacity(), - } - } - - /// Returns the total count of [`Envelope`]s in the store. - pub async fn store_total_count(&self) -> u64 { - match self { - EnvelopeRepository::Memory(repository) => repository.store_total_count(), - EnvelopeRepository::SQLite(repository) => repository.store_total_count().await, - } - } - - /// Returns the string representation of the [`EnvelopeRepository`]'s strategy. - pub fn name(&self) -> &'static str { - match self { - EnvelopeRepository::Memory(_) => "memory", - EnvelopeRepository::SQLite(_) => "sqlite", - } - } -} diff --git a/relay-server/src/services/buffer/envelope_repository/sqlite.rs b/relay-server/src/services/buffer/envelope_repository/sqlite.rs deleted file mode 100644 index 61b07953499..00000000000 --- a/relay-server/src/services/buffer/envelope_repository/sqlite.rs +++ /dev/null @@ -1,681 +0,0 @@ -use crate::services::buffer::common::ProjectKeyPair; -use crate::services::buffer::envelope_repository::InitializationState; -use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; -use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; -use crate::{Envelope, SqliteEnvelopeStore}; -use hashbrown::{HashMap, HashSet}; -use relay_config::Config; -use std::error::Error; - -/// An error returned when doing an operation on [`SqliteEnvelopeRepository`]. -#[derive(Debug, thiserror::Error)] -pub enum SqliteEnvelopeRepositoryError { - /// Represents an error that occurred in the envelope store. - #[error("an error occurred in the envelope store: {0}")] - EnvelopeStoreError(#[from] SqliteEnvelopeStoreError), -} - -#[derive(Debug, Default)] -struct EnvelopeStack { - #[allow(clippy::vec_box)] - cached_envelopes: Vec>, - check_disk: bool, -} - -/// A repository for storing and managing envelopes using SQLite as a backend. -/// -/// This struct manages both in-memory and on-disk storage of envelopes, -/// implementing spooling and unspooling mechanisms to balance between -/// memory usage and disk I/O. -#[derive(Debug)] -pub struct SqliteEnvelopeRepository { - envelope_stacks: HashMap, - envelope_store: SqliteEnvelopeStore, - cached_envelopes_size: u64, - disk_batch_size: usize, - max_disk_size: usize, -} - -impl SqliteEnvelopeRepository { - /// Creates a new [`SqliteEnvelopeRepository`] instance. - pub async fn new(config: &Config) -> Result { - let envelope_store = SqliteEnvelopeStore::prepare(config).await?; - Ok(Self { - envelope_stacks: HashMap::new(), - envelope_store, - cached_envelopes_size: 0, - disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), - max_disk_size: config.spool_envelopes_max_disk_size(), - }) - } - - /// Creates a new [`SqliteEnvelopeRepository`] instance given a [`SqliteEnvelopeStore`]. - pub fn new_with_store(config: &Config, envelope_store: SqliteEnvelopeStore) -> Self { - Self { - envelope_stacks: HashMap::new(), - envelope_store, - cached_envelopes_size: 0, - disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), - max_disk_size: config.spool_envelopes_max_disk_size(), - } - } - - /// Initializes the envelope repository. - /// - /// Retrieves the project key pairs from the envelope store and creates - /// an initialization state. - pub async fn initialize(&mut self) -> InitializationState { - match self.envelope_store.project_key_pairs().await { - Ok(project_key_pairs) => { - self.initialize_empty_stacks(&project_key_pairs); - InitializationState::new(project_key_pairs) - } - Err(error) => { - relay_log::error!( - error = &error as &dyn Error, - "failed to initialize the sqlite stack repository" - ); - InitializationState::empty() - } - } - } - - /// Pushes an envelope to the repository for the given project key pair. - /// - /// If the spool threshold is exceeded, it may trigger spooling to disk. - pub async fn push( - &mut self, - project_key_pair: ProjectKeyPair, - envelope: Box, - ) -> Result<(), SqliteEnvelopeRepositoryError> { - if self.above_spool_threshold() { - self.spool_to_disk().await?; - } - - self.envelope_stacks - .entry(project_key_pair) - .or_default() - .cached_envelopes - .push(envelope); - - self.cached_envelopes_size += 1; - - Ok(()) - } - - /// Peeks at the next envelope for the given project key pair without removing it. - /// - /// If no envelope is in the buffer, it will be loaded from disk and a reference will be - /// returned. - pub async fn peek( - &mut self, - project_key_pair: ProjectKeyPair, - ) -> Result, SqliteEnvelopeRepositoryError> { - // If we have no data for the project key pair, we can safely assume we don't have envelopes - // for this pair anywhere. - let Some(envelope_stack) = self.envelope_stacks.get(&project_key_pair) else { - return Ok(None); - }; - - if envelope_stack.cached_envelopes.is_empty() && envelope_stack.check_disk { - let envelopes = self.unspool_from_disk(project_key_pair, 1).await?; - // If we have no envelopes in the buffer and no on disk, we can be safe removing the entry - // in the buffer. - if envelopes.is_empty() { - self.envelope_stacks.remove(&project_key_pair); - return Ok(None); - } - - self.cached_envelopes_size += envelopes.len() as u64; - self.envelope_stacks - .entry(project_key_pair) - .or_default() - .cached_envelopes - .extend(envelopes); - } - - Ok(self - .envelope_stacks - .get(&project_key_pair) - .and_then(|e| e.cached_envelopes.last().map(Box::as_ref))) - } - - /// Pops and returns the next envelope for the given project key pair. - /// - /// If no envelope is in the buffer, it will be loaded from disk. - pub async fn pop( - &mut self, - project_key_pair: ProjectKeyPair, - ) -> Result>, SqliteEnvelopeRepositoryError> { - let envelope = self - .envelope_stacks - .get_mut(&project_key_pair) - .and_then(|envelopes| envelopes.cached_envelopes.pop()); - if let Some(envelope) = envelope { - // We only decrement the counter when removing data from the in memory buffer. - self.cached_envelopes_size -= 1; - return Ok(Some(envelope)); - } - - // If we don't need to check disk, we assume there are no envelopes, so we early return - // `None`. - if !self.should_check_disk(project_key_pair) { - return Ok(None); - } - - // If we have no envelopes in the buffer, we try to pop and immediately return data from - // disk. - let mut envelopes = self.unspool_from_disk(project_key_pair, 1).await?; - // If we have no envelopes in the buffer and no on disk, we can be safe removing the entry - // in the buffer. - if envelopes.is_empty() { - self.envelope_stacks.remove(&project_key_pair); - } - - Ok(envelopes.pop()) - } - - /// Flushes all remaining envelopes to disk. - pub async fn flush(&mut self) -> bool { - relay_statsd::metric!(timer(RelayTimers::BufferFlush), { - let envelope_store = self.envelope_store.clone(); - for batch in self.get_envelope_batches() { - if let Err(error) = Self::insert_envelope_batch(envelope_store.clone(), batch).await - { - relay_log::error!( - error = &error as &dyn Error, - "failed to flush envelopes, some might be lost", - ); - } - } - }); - - true - } - - /// Checks if there's capacity in the store for more envelopes. - pub fn has_store_capacity(&self) -> bool { - (self.envelope_store.usage() as usize) < self.max_disk_size - } - - /// Retrieves the total count of envelopes in the store. - pub async fn store_total_count(&self) -> u64 { - self.envelope_store - .total_count() - .await - .unwrap_or_else(|error| { - relay_log::error!( - error = &error as &dyn Error, - "failed to get the total count of envelopes for the sqlite envelope store", - ); - // In case we have an error, we default to communicating a total count of 0. - 0 - }) - } - - /// Initializes a set of empty [`EnvelopeStack`]s. - fn initialize_empty_stacks(&mut self, project_key_pairs: &HashSet) { - for &project_key_pair in project_key_pairs { - let envelope_stack = self.envelope_stacks.entry(project_key_pair).or_default(); - // When creating an envelope stack during initialization, we assume data is on disk. - envelope_stack.check_disk = true; - } - } - - /// Determines if the number of buffered envelopes is above the spool threshold. - fn above_spool_threshold(&self) -> bool { - self.cached_envelopes_size >= self.disk_batch_size as u64 - } - - /// Spools all buffered envelopes to disk. - async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeRepositoryError> { - let envelope_store = self.envelope_store.clone(); - let mut processed_batches = 0; - for batch in self.get_envelope_batches() { - Self::insert_envelope_batch(envelope_store.clone(), batch).await?; - processed_batches += 1; - } - // We should have only one batch here, since we spool when we reach the batch size. - debug_assert!(processed_batches == 1); - - Ok(()) - } - - /// Unspools from disk up to `n` envelopes and returns them. - async fn unspool_from_disk( - &mut self, - project_key_pair: ProjectKeyPair, - n: u64, - ) -> Result>, SqliteEnvelopeRepositoryError> { - let envelopes = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), { - self.envelope_store - .delete_many( - project_key_pair.own_key, - project_key_pair.sampling_key, - n as i64, - ) - .await - .map_err(SqliteEnvelopeRepositoryError::EnvelopeStoreError)? - }); - - if envelopes.is_empty() { - // In case no envelopes were unspooled, we mark this project key pair as having no - // envelopes on disk. - self.set_check_disk(project_key_pair, false); - - return Ok(vec![]); - } - - relay_statsd::metric!( - counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64 - ); - - Ok(envelopes) - } - - /// Returns `true` whether the disk should be checked for data for a given [`ProjectKeyPair`], - /// false otherwise. - fn should_check_disk(&self, project_key_pair: ProjectKeyPair) -> bool { - // If a project key pair is unknown, we don't want to check disk. - self.envelope_stacks - .get(&project_key_pair) - .map_or(false, |e| e.check_disk) - } - - /// Sets on the [`EnvelopeStack`] whether the disk should be checked or not. - fn set_check_disk(&mut self, project_key_pair: ProjectKeyPair, check_disk: bool) { - if let Some(envelope_stack) = self.envelope_stacks.get_mut(&project_key_pair) { - envelope_stack.check_disk = check_disk; - } - } - - /// Returns batches of envelopes of size `self.disk_batch_size`. - #[allow(clippy::vec_box)] - fn get_envelope_batches(&mut self) -> impl Iterator>> + '_ { - // Create a flat iterator over all the envelopes - let envelope_iter = self.envelope_stacks.values_mut().flat_map(|e| { - e.check_disk = true; - self.cached_envelopes_size -= e.cached_envelopes.len() as u64; - relay_statsd::metric!( - histogram(RelayHistograms::BufferInMemoryEnvelopesPerKeyPair) = - e.cached_envelopes.len() as u64 - ); - e.cached_envelopes.drain(..) - }); - - // Wrap this flat iterator with a custom chunking logic - ChunkedIterator { - inner: envelope_iter, - chunk_size: self.disk_batch_size, - } - } - - /// Inserts a batch of envelopes into the envelope store. - #[allow(clippy::vec_box)] - async fn insert_envelope_batch( - mut envelope_store: SqliteEnvelopeStore, - batch: Vec>, - ) -> Result<(), SqliteEnvelopeRepositoryError> { - if batch.is_empty() { - return Ok(()); - } - - relay_statsd::metric!(counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64); - - // Convert envelopes into a format which simplifies insertion in the store. - let envelopes = batch.iter().filter_map(|e| e.as_ref().try_into().ok()); - - relay_statsd::metric!(timer(RelayTimers::BufferSpool), { - envelope_store - .insert_many(envelopes) - .await - .map_err(SqliteEnvelopeRepositoryError::EnvelopeStoreError)?; - }); - - Ok(()) - } -} - -struct ChunkedIterator -where - I: Iterator>, -{ - inner: I, - chunk_size: usize, -} - -impl Iterator for ChunkedIterator -where - I: Iterator>, -{ - type Item = Vec>; - - fn next(&mut self) -> Option { - let mut batch = Vec::with_capacity(self.chunk_size); - - // Fill up the batch with up to `chunk_size` envelopes - for _ in 0..self.chunk_size { - if let Some(envelope) = self.inner.next() { - batch.push(envelope); - } else { - break; // Stop when there are no more items - } - } - - // Return `None` if no more batches are available - if batch.is_empty() { - None - } else { - Some(batch) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::services::buffer::testutils::utils::{ - mock_envelope, mock_envelopes, mock_envelopes_for_project, setup_db, - }; - use relay_base_schema::project::ProjectKey; - use std::time::{Duration, Instant}; - - async fn setup_repository( - run_migrations: bool, - disk_batch_size: usize, - max_disk_size: usize, - ) -> SqliteEnvelopeRepository { - let db = setup_db(run_migrations).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - - SqliteEnvelopeRepository { - envelope_stacks: HashMap::new(), - envelope_store, - cached_envelopes_size: 0, - disk_batch_size, - max_disk_size, - } - } - - #[tokio::test] - async fn test_initialize_with_unmigrated_db() { - let mut repository = setup_repository(false, 2, 0).await; - - let initialization_state = repository.initialize().await; - assert!(initialization_state.project_key_pairs.is_empty()); - } - - #[tokio::test] - async fn test_push_with_unmigrated_db() { - let mut repository = setup_repository(false, 1, 0).await; - - let project_key_pair = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - ); - - let envelope_1 = mock_envelope(Instant::now(), Some(project_key_pair.sampling_key)); - let envelope_2 = mock_envelope(Instant::now(), Some(project_key_pair.sampling_key)); - - // Push should succeed as it doesn't interact with the database initially - assert!(repository.push(project_key_pair, envelope_1).await.is_ok()); - - // Push should fail because after the second insertion we try to spool - let result = repository.push(project_key_pair, envelope_2).await; - assert!(result.is_err()); - if let Err(error) = result { - assert!(matches!( - error, - SqliteEnvelopeRepositoryError::EnvelopeStoreError(_) - )); - } - } - - #[tokio::test] - async fn test_pop_with_unmigrated_db() { - let mut repository = setup_repository(false, 1, 0).await; - - let project_key_pair = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - ); - - // We initialize empty stacks to make sure the repository checks for disk - let mut project_key_pairs = HashSet::new(); - project_key_pairs.insert(project_key_pair); - repository.initialize_empty_stacks(&project_key_pairs); - - // Pop should fail because we can't unspool data from disk - let result = repository.pop(project_key_pair).await; - assert!(result.is_err()); - if let Err(error) = result { - assert!(matches!( - error, - SqliteEnvelopeRepositoryError::EnvelopeStoreError(_) - )); - } - } - - #[tokio::test] - async fn test_push_and_pop() { - let mut repository = setup_repository(true, 2, 0).await; - let project_key_pair = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - ); - - let envelopes = mock_envelopes(5); - - // Push 5 envelopes - for envelope in envelopes.clone() { - assert!(repository.push(project_key_pair, envelope).await.is_ok()); - } - - // Pop 5 envelopes - for envelope in envelopes.iter().rev() { - let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - - // Ensure the repository is empty - assert!(repository.pop(project_key_pair).await.unwrap().is_none()); - assert!(!repository.envelope_stacks.contains_key(&project_key_pair)); - } - - #[tokio::test] - async fn test_peek() { - let mut repository = setup_repository(true, 2, 0).await; - let project_key_pair = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - ); - - let envelope = mock_envelope(Instant::now(), None); - repository - .push(project_key_pair, envelope.clone()) - .await - .unwrap(); - - // Peek at the envelope - let peeked_envelope = repository.peek(project_key_pair).await.unwrap().unwrap(); - assert_eq!( - peeked_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - - // Ensure the envelope is still there after peeking - let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - - #[tokio::test] - async fn test_spool_and_unspool_disk() { - let mut repository = setup_repository(true, 5, 0).await; - let project_key_pair = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - ); - - let envelopes = mock_envelopes(15); - - // Push 15 envelopes (should trigger spooling after 5) - for envelope in envelopes.clone() { - assert!(repository.push(project_key_pair, envelope).await.is_ok()); - } - - // Check that we have 5 envelopes in memory (1 batch of 3) - assert_eq!(repository.cached_envelopes_size, 5); - assert_eq!(repository.store_total_count().await, 10); - - // Pop all envelopes - for envelope in envelopes.iter().rev() { - let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap(), - ); - } - - // Ensure the repository is now empty - assert!(repository.pop(project_key_pair).await.unwrap().is_none()); - assert_eq!(repository.cached_envelopes_size, 0); - assert_eq!(repository.store_total_count().await, 0); - } - - #[tokio::test] - async fn test_flush() { - let mut repository = setup_repository(true, 2, 1000).await; - let project_key_pair = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - ); - - let envelopes = mock_envelopes(5); - - // Push 5 envelopes - for envelope in envelopes.clone() { - assert!(repository.push(project_key_pair, envelope).await.is_ok()); - } - - // Flush all envelopes to disk - assert!(repository.flush().await); - - // Check that all envelopes are now on disk - assert_eq!(repository.store_total_count().await, 5); - - // Pop all envelopes (should trigger unspool from disk) - for envelope in envelopes.iter().rev() { - let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - - // Ensure the repository is empty - assert!(repository.pop(project_key_pair).await.unwrap().is_none()); - assert_eq!(repository.store_total_count().await, 0); - } - - #[tokio::test] - async fn test_multiple_project_key_pairs() { - let mut repository = setup_repository(true, 2, 1000).await; - let project_key_pair1 = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b28ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ); - let project_key_pair2 = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("c67ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ); - - let envelopes1 = mock_envelopes_for_project(3, project_key_pair1.sampling_key); - let envelopes2 = mock_envelopes_for_project(2, project_key_pair2.sampling_key); - - // Push envelopes for both project key pairs - for envelope in envelopes1.clone() { - assert!(repository.push(project_key_pair1, envelope).await.is_ok()); - } - for envelope in envelopes2.clone() { - assert!(repository.push(project_key_pair2, envelope).await.is_ok()); - } - - // Pop envelopes for project_key_pair1 - for envelope in envelopes1.iter().rev() { - let popped_envelope = repository.pop(project_key_pair1).await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - - // Pop envelopes for project_key_pair2 - for envelope in envelopes2.iter().rev() { - let popped_envelope = repository.pop(project_key_pair2).await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - - // Ensure both project key pairs are empty - assert!(repository.pop(project_key_pair1).await.unwrap().is_none()); - assert!(repository.pop(project_key_pair2).await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_check_disk() { - let mut repository = setup_repository(true, 2, 0).await; - let project_key_pair1 = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b28ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ); - let project_key_pair2 = ProjectKeyPair::new( - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("c67ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ); - - // Push 3 envelopes for project_key_pair1 (should trigger spooling) - let envelopes1 = mock_envelopes_for_project(3, project_key_pair1.sampling_key); - for envelope in envelopes1 { - assert!(repository.push(project_key_pair1, envelope).await.is_ok()); - } - - // Since we spool, we expect to be able to check disk for project_key_pair1 - for (&project_key_pair, envelope_stack) in repository.envelope_stacks.iter() { - assert_eq!( - envelope_stack.check_disk, - project_key_pair == project_key_pair1 - ); - } - - // Pop all envelopes for project_key_pair1 - while repository.pop(project_key_pair1).await.unwrap().is_some() {} - assert_eq!(repository.store_total_count().await, 0); - - // Push 1 envelope for project_key_pair2 (should not trigger spooling) - let envelope = mock_envelope(Instant::now(), Some(project_key_pair2.sampling_key)); - assert!(repository.push(project_key_pair2, envelope).await.is_ok()); - - // Flush remaining envelopes to disk - assert!(repository.flush().await); - - // After flushing, we expect to be able to check disk for project_key_pair2 - for (&project_key_pair, envelope_stack) in repository.envelope_stacks.iter() { - assert_eq!( - envelope_stack.check_disk, - project_key_pair == project_key_pair2 - ); - } - - // Pop all envelopes for project_key_pair1 - while repository.pop(project_key_pair2).await.unwrap().is_some() {} - assert_eq!(repository.store_total_count().await, 0); - } -} diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs new file mode 100644 index 00000000000..ceb771ec95a --- /dev/null +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -0,0 +1,35 @@ +use std::convert::Infallible; + +use crate::Envelope; + +use super::EnvelopeStack; + +#[derive(Debug)] +pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); + +impl MemoryEnvelopeStack { + pub fn new() -> Self { + Self(vec![]) + } +} + +impl EnvelopeStack for MemoryEnvelopeStack { + type Error = Infallible; + + async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { + self.0.push(envelope); + Ok(()) + } + + async fn peek(&mut self) -> Result, Self::Error> { + Ok(self.0.last().map(Box::as_ref)) + } + + async fn pop(&mut self) -> Result>, Self::Error> { + Ok(self.0.pop()) + } + + fn flush(self) -> Vec> { + self.0 + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs new file mode 100644 index 00000000000..8acfff06007 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -0,0 +1,26 @@ +use std::future::Future; + +use crate::envelope::Envelope; + +pub mod memory; +pub mod sqlite; + +/// A stack-like data structure that holds [`Envelope`]s. +pub trait EnvelopeStack: Send + std::fmt::Debug { + /// The error type that is returned when an error is encountered during reading or writing the + /// [`EnvelopeStack`]. + type Error: std::fmt::Debug; + + /// Pushes an [`Envelope`] on top of the stack. + fn push(&mut self, envelope: Box) -> impl Future>; + + /// Peeks the [`Envelope`] on top of the stack. + fn peek(&mut self) -> impl Future, Self::Error>>; + + /// Pops the [`Envelope`] on top of the stack. + fn pop(&mut self) -> impl Future>, Self::Error>>; + + /// Persists all envelopes in the [`EnvelopeStack`]s to external storage, if possible, + /// and consumes the stack provider. + fn flush(self) -> Vec>; +} diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs new file mode 100644 index 00000000000..703569ddab3 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -0,0 +1,496 @@ +use std::collections::VecDeque; +use std::fmt::Debug; +use std::num::NonZeroUsize; + +use relay_base_schema::project::ProjectKey; + +use crate::envelope::Envelope; +use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::envelope_store::sqlite::{ + SqliteEnvelopeStore, SqliteEnvelopeStoreError, +}; +use crate::statsd::{RelayCounters, RelayTimers}; + +/// An error returned when doing an operation on [`SqliteEnvelopeStack`]. +#[derive(Debug, thiserror::Error)] +pub enum SqliteEnvelopeStackError { + #[error("an error occurred in the envelope store: {0}")] + EnvelopeStoreError(#[from] SqliteEnvelopeStoreError), +} + +#[derive(Debug)] +/// An [`EnvelopeStack`] that is implemented on an SQLite database. +/// +/// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled +/// to disk in a batched way. +pub struct SqliteEnvelopeStack { + /// Shared SQLite database pool which will be used to read and write from disk. + envelope_store: SqliteEnvelopeStore, + /// Threshold defining the maximum number of envelopes in the `batches_buffer` before spooling + /// to disk will take place. + spool_threshold: NonZeroUsize, + /// Size of a batch of envelopes that is written to disk. + batch_size: NonZeroUsize, + /// The project key of the project to which all the envelopes belong. + own_key: ProjectKey, + /// The project key of the root project of the trace to which all the envelopes belong. + sampling_key: ProjectKey, + /// In-memory stack containing all the batches of envelopes that either have not been written to disk yet, or have been read from disk recently. + #[allow(clippy::vec_box)] + batches_buffer: VecDeque>>, + /// The total number of envelopes inside the `batches_buffer`. + batches_buffer_size: usize, + /// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough + /// elements are available in the `batches_buffer`. + check_disk: bool, +} + +impl SqliteEnvelopeStack { + /// Creates a new empty [`SqliteEnvelopeStack`]. + pub fn new( + envelope_store: SqliteEnvelopeStore, + disk_batch_size: usize, + max_batches: usize, + own_key: ProjectKey, + sampling_key: ProjectKey, + check_disk: bool, + ) -> Self { + Self { + envelope_store, + spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches) + .expect("the spool threshold must be > 0"), + batch_size: NonZeroUsize::new(disk_batch_size) + .expect("the disk batch size must be > 0"), + own_key, + sampling_key, + batches_buffer: VecDeque::with_capacity(max_batches), + batches_buffer_size: 0, + check_disk, + } + } + + /// Threshold above which the [`SqliteEnvelopeStack`] will spool data from the `buffer` to disk. + fn above_spool_threshold(&self) -> bool { + self.batches_buffer_size >= self.spool_threshold.get() + } + + /// Threshold below which the [`SqliteEnvelopeStack`] will unspool data from disk to the + /// `buffer`. + fn below_unspool_threshold(&self) -> bool { + self.batches_buffer_size == 0 + } + + /// Spools to disk up to `disk_batch_size` envelopes from the `buffer`. + /// + /// In case there is a failure while writing envelopes, all the envelopes that were enqueued + /// to be written to disk are lost. The explanation for this behavior can be found in the body + /// of the method. + async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { + let Some(envelopes) = self.batches_buffer.pop_front() else { + return Ok(()); + }; + self.batches_buffer_size -= envelopes.len(); + + relay_statsd::metric!( + counter(RelayCounters::BufferSpooledEnvelopes) += envelopes.len() as u64 + ); + + // We convert envelopes into a format which simplifies insertion in the store. If an + // envelope can't be serialized, we will not insert it. + let envelopes = envelopes.iter().filter_map(|e| e.as_ref().try_into().ok()); + + // When early return here, we are acknowledging that the elements that we popped from + // the buffer are lost in case of failure. We are doing this on purposes, since if we were + // to have a database corruption during runtime, and we were to put the values back into + // the buffer we will end up with an infinite cycle. + relay_statsd::metric!(timer(RelayTimers::BufferSpool), { + self.envelope_store + .insert_many(envelopes) + .await + .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; + }); + + // If we successfully spooled to disk, we know that data should be there. + self.check_disk = true; + + Ok(()) + } + + /// Unspools from disk up to `disk_batch_size` envelopes and appends them to the `buffer`. + /// + /// In case a single deletion fails, the affected envelope will not be unspooled and unspooling + /// will continue with the remaining envelopes. + /// + /// In case an envelope fails deserialization due to malformed data in the database, the affected + /// envelope will not be unspooled and unspooling will continue with the remaining envelopes. + async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { + let envelopes = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), { + self.envelope_store + .delete_many( + self.own_key, + self.sampling_key, + self.batch_size.get() as i64, + ) + .await + .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)? + }); + + if envelopes.is_empty() { + // In case no envelopes were unspooled, we will mark the disk as empty until another + // round of spooling takes place. + self.check_disk = false; + + return Ok(()); + } + + relay_statsd::metric!( + counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64 + ); + + // We push in the back of the buffer, since we still want to give priority to + // incoming envelopes that have a more recent timestamp. + self.batches_buffer_size += envelopes.len(); + self.batches_buffer.push_front(envelopes); + + Ok(()) + } + + /// Validates that the incoming [`Envelope`] has the same project keys at the + /// [`SqliteEnvelopeStack`]. + fn validate_envelope(&self, envelope: &Envelope) -> bool { + let own_key = envelope.meta().public_key(); + let sampling_key = envelope.sampling_key().unwrap_or(own_key); + + self.own_key == own_key && self.sampling_key == sampling_key + } +} + +impl EnvelopeStack for SqliteEnvelopeStack { + type Error = SqliteEnvelopeStackError; + + async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { + debug_assert!(self.validate_envelope(&envelope)); + + if self.above_spool_threshold() { + self.spool_to_disk().await?; + } + + // We need to check if the topmost batch has space, if not we have to create a new batch and + // push it in front. + if let Some(last_batch) = self + .batches_buffer + .back_mut() + .filter(|last_batch| last_batch.len() < self.batch_size.get()) + { + last_batch.push(envelope); + } else { + let mut new_batch = Vec::with_capacity(self.batch_size.get()); + new_batch.push(envelope); + self.batches_buffer.push_back(new_batch); + } + + self.batches_buffer_size += 1; + + Ok(()) + } + + async fn peek(&mut self) -> Result, Self::Error> { + if self.below_unspool_threshold() && self.check_disk { + self.unspool_from_disk().await? + } + + let last = self + .batches_buffer + .back() + .and_then(|last_batch| last_batch.last()) + .map(|last_batch| last_batch.as_ref()); + + Ok(last) + } + + async fn pop(&mut self) -> Result>, Self::Error> { + if self.below_unspool_threshold() && self.check_disk { + relay_log::trace!("Unspool from disk"); + self.unspool_from_disk().await? + } + + let result = self.batches_buffer.back_mut().and_then(|last_batch| { + self.batches_buffer_size -= 1; + relay_log::trace!("Popping from memory"); + last_batch.pop() + }); + if result.is_none() { + return Ok(None); + } + + // Since we might leave a batch without elements, we want to pop it from the buffer. + if self + .batches_buffer + .back() + .map_or(false, |last_batch| last_batch.is_empty()) + { + self.batches_buffer.pop_back(); + } + + Ok(result) + } + + fn flush(self) -> Vec> { + self.batches_buffer.into_iter().flatten().collect() + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use relay_base_schema::project::ProjectKey; + + use super::*; + use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db}; + + #[tokio::test] + #[should_panic] + async fn test_push_with_mismatching_project_keys() { + let db = setup_db(false).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, + 2, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + let envelope = mock_envelope(Instant::now()); + let _ = stack.push(envelope).await; + } + + #[tokio::test] + async fn test_push_when_db_is_not_valid() { + let db = setup_db(false).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, + 2, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + let envelopes = mock_envelopes(4); + + // We push the 4 envelopes without errors because they are below the threshold. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + + // We push 1 more envelope which results in spooling, which fails because of a database + // problem. + let envelope = mock_envelope(Instant::now()); + assert!(matches!( + stack.push(envelope).await, + Err(SqliteEnvelopeStackError::EnvelopeStoreError(_)) + )); + + // The stack now contains the last of the 3 elements that were added. If we add a new one + // we will end up with 2. + let envelope = mock_envelope(Instant::now()); + assert!(stack.push(envelope.clone()).await.is_ok()); + assert_eq!(stack.batches_buffer_size, 3); + + // We pop the remaining elements, expecting the last added envelope to be on top. + let popped_envelope_1 = stack.pop().await.unwrap().unwrap(); + let popped_envelope_2 = stack.pop().await.unwrap().unwrap(); + let popped_envelope_3 = stack.pop().await.unwrap().unwrap(); + assert_eq!( + popped_envelope_1.event_id().unwrap(), + envelope.event_id().unwrap() + ); + assert_eq!( + popped_envelope_2.event_id().unwrap(), + envelopes.clone()[3].event_id().unwrap() + ); + assert_eq!( + popped_envelope_3.event_id().unwrap(), + envelopes.clone()[2].event_id().unwrap() + ); + assert_eq!(stack.batches_buffer_size, 0); + } + + #[tokio::test] + async fn test_pop_when_db_is_not_valid() { + let db = setup_db(false).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, + 2, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + // We pop with an invalid db. + assert!(matches!( + stack.pop().await, + Err(SqliteEnvelopeStackError::EnvelopeStoreError(_)) + )); + } + + #[tokio::test] + async fn test_pop_when_stack_is_empty() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, + 2, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + // We pop with no elements. + // We pop with no elements. + assert!(stack.pop().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_push_below_threshold_and_pop() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, + 5, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + let envelopes = mock_envelopes(5); + + // We push 5 envelopes. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + assert_eq!(stack.batches_buffer_size, 5); + + // We peek the top element. + let peeked_envelope = stack.peek().await.unwrap().unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelopes.clone()[4].event_id().unwrap() + ); + + // We pop 5 envelopes. + for envelope in envelopes.iter().rev() { + let popped_envelope = stack.pop().await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + } + + #[tokio::test] + async fn test_push_above_threshold_and_pop() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, + 5, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + let envelopes = mock_envelopes(15); + + // We push 15 envelopes. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + assert_eq!(stack.batches_buffer_size, 10); + + // We peek the top element. + let peeked_envelope = stack.peek().await.unwrap().unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelopes.clone()[14].event_id().unwrap() + ); + + // We pop 10 envelopes, and we expect that the last 10 are in memory, since the first 5 + // should have been spooled to disk. + for envelope in envelopes[5..15].iter().rev() { + let popped_envelope = stack.pop().await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + assert_eq!(stack.batches_buffer_size, 0); + + // We peek the top element, which since the buffer is empty should result in a disk load. + let peeked_envelope = stack.peek().await.unwrap().unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelopes.clone()[4].event_id().unwrap() + ); + + // We insert a new envelope, to test the load from disk happening during `peek()` gives + // priority to this envelope in the stack. + let envelope = mock_envelope(Instant::now()); + assert!(stack.push(envelope.clone()).await.is_ok()); + + // We pop and expect the newly inserted element. + let popped_envelope = stack.pop().await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + + // We pop 5 envelopes, which should not result in a disk load since `peek()` already should + // have caused it. + for envelope in envelopes[0..5].iter().rev() { + let popped_envelope = stack.pop().await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + assert_eq!(stack.batches_buffer_size, 0); + } + + #[tokio::test] + async fn test_drain() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store.clone(), + 5, + 1, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + let envelopes = mock_envelopes(5); + + // We push 5 envelopes and check that there is nothing on disk. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + assert_eq!(stack.batches_buffer_size, 5); + assert_eq!(envelope_store.total_count().await.unwrap(), 0); + + // We drain the stack and make sure nothing was spooled to disk. + let drained_envelopes = stack.flush(); + assert_eq!(drained_envelopes.into_iter().collect::>().len(), 5); + assert_eq!(envelope_store.total_count().await.unwrap(), 0); + } +} diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 29c2ec10bdc..d732452763b 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -29,21 +29,21 @@ use crate::utils::ManagedEnvelope; use crate::MemoryChecker; use crate::MemoryStat; +pub use envelope_buffer::EnvelopeBufferError; // pub for benchmarks -pub use common::{EnvelopeBufferError, ProjectKeyPair}; +pub use envelope_buffer::PolymorphicEnvelopeBuffer; // pub for benchmarks -pub use envelope_buffer::EnvelopeBuffer as EnvelopeBufferImpl; +pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks -pub use envelope_repository::sqlite::SqliteEnvelopeRepository; -// pub for benchmarks -pub use envelope_repository::EnvelopeRepository; +pub use envelope_stack::EnvelopeStack; // pub for benchmarks pub use envelope_store::sqlite::SqliteEnvelopeStore; mod common; mod envelope_buffer; -mod envelope_repository; +mod envelope_stack; mod envelope_store; +mod stack_provider; mod testutils; /// Message interface for [`EnvelopeBufferService`]. @@ -155,7 +155,7 @@ impl EnvelopeBufferService { /// Wait for the configured amount of time and make sure the project cache is ready to receive. async fn ready_to_pop( &mut self, - buffer: &EnvelopeBufferImpl, + buffer: &PolymorphicEnvelopeBuffer, dequeue: bool, ) -> Option> { relay_statsd::metric!( @@ -194,7 +194,7 @@ impl EnvelopeBufferService { /// - We should not pop from disk into memory when relay's overall memory capacity /// has been reached. /// - We need a valid global config to unspool. - async fn system_ready(&self, buffer: &EnvelopeBufferImpl, dequeue: bool) { + async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool) { loop { // We should not unspool from external storage if memory capacity has been reached. // But if buffer storage is in memory, unspooling can reduce memory usage. @@ -216,7 +216,7 @@ impl EnvelopeBufferService { /// Tries to pop an envelope for a ready project. async fn try_pop<'a>( config: &Config, - buffer: &mut EnvelopeBufferImpl, + buffer: &mut PolymorphicEnvelopeBuffer, services: &Services, envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, ) -> Result { @@ -304,7 +304,7 @@ impl EnvelopeBufferService { managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message(buffer: &mut EnvelopeBufferImpl, message: EnvelopeBuffer) { + async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -333,12 +333,12 @@ impl EnvelopeBufferService { }; } - async fn handle_shutdown(buffer: &mut EnvelopeBufferImpl, message: Shutdown) -> bool { + async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool { // We gracefully shut down only if the shutdown has a timeout. if let Some(shutdown_timeout) = message.timeout { relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); - let shutdown_result = timeout(shutdown_timeout, buffer.flush()).await; + let shutdown_result = timeout(shutdown_timeout, buffer.shutdown()).await; match shutdown_result { Ok(shutdown_result) => { return shutdown_result; @@ -355,7 +355,7 @@ impl EnvelopeBufferService { false } - async fn push(buffer: &mut EnvelopeBufferImpl, envelope: Box) { + async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -364,7 +364,7 @@ impl EnvelopeBufferService { } } - fn update_observable_state(&self, buffer: &mut EnvelopeBufferImpl) { + fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { self.has_capacity .store(buffer.has_capacity(), Ordering::Relaxed); } @@ -383,7 +383,7 @@ impl Service for EnvelopeBufferService { let dequeue1 = dequeue.clone(); tokio::spawn(async move { - let buffer = EnvelopeBufferImpl::from_config(&config, memory_checker).await; + let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; let mut buffer = match buffer { Ok(buffer) => buffer, @@ -410,22 +410,10 @@ impl Service for EnvelopeBufferService { let mut sleep = Duration::MAX; tokio::select! { - biased; - - shutdown = shutdown.notified() => { - // In case the shutdown was handled, we break out of the loop signaling that - // there is no need to process anymore envelopes. - if Self::handle_shutdown(&mut buffer, shutdown).await { - break; - } - } - Ok(()) = global_config_rx.changed() => { - sleep = Duration::ZERO; - } - Some(message) = rx.recv() => { - Self::handle_message(&mut buffer, message).await; - sleep = Duration::ZERO; - } + // NOTE: we do not select a bias here. + // On the one hand, we might want to prioritize dequeuing over enqueuing + // so we do not exceed the buffer capacity by starving the dequeue. + // on the other hand, prioritizing old messages violates the LIFO design. Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => { match Self::try_pop(&config, &mut buffer, &services, permit).await { Ok(new_sleep) => { @@ -439,6 +427,20 @@ impl Service for EnvelopeBufferService { } } } + Some(message) = rx.recv() => { + Self::handle_message(&mut buffer, message).await; + sleep = Duration::ZERO; + } + shutdown = shutdown.notified() => { + // In case the shutdown was handled, we break out of the loop signaling that + // there is no need to process anymore envelopes. + if Self::handle_shutdown(&mut buffer, shutdown).await { + break; + } + } + Ok(()) = global_config_rx.changed() => { + sleep = Duration::ZERO; + } else => break, } diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs new file mode 100644 index 00000000000..230db32b340 --- /dev/null +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -0,0 +1,52 @@ +use crate::services::buffer::common::ProjectKeyPair; +use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; +use crate::services::buffer::stack_provider::{ + InitializationState, StackCreationType, StackProvider, +}; +use crate::utils::MemoryChecker; +use crate::EnvelopeStack; + +#[derive(Debug)] +pub struct MemoryStackProvider { + memory_checker: MemoryChecker, +} + +impl MemoryStackProvider { + /// Creates a new [`MemoryStackProvider`] with a given [`MemoryChecker`] that is used to + /// estimate the capacity. + pub fn new(memory_checker: MemoryChecker) -> Self { + Self { memory_checker } + } +} + +impl StackProvider for MemoryStackProvider { + type Stack = MemoryEnvelopeStack; + + async fn initialize(&self) -> InitializationState { + InitializationState::empty() + } + + fn create_stack(&self, _: StackCreationType, _: ProjectKeyPair) -> Self::Stack { + MemoryEnvelopeStack::new() + } + + fn has_store_capacity(&self) -> bool { + self.memory_checker.check_memory().has_capacity() + } + + async fn store_total_count(&self) -> u64 { + // The memory implementation doesn't have a store, so the count is 0. + 0 + } + + fn stack_type<'a>(&self) -> &'a str { + "memory" + } + + async fn flush(&mut self, envelope_stacks: impl IntoIterator) { + for envelope_stack in envelope_stacks { + // The flushed envelopes will be immediately dropped. + let _ = envelope_stack.flush(); + } + } +} diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs new file mode 100644 index 00000000000..715d70c436c --- /dev/null +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -0,0 +1,69 @@ +use crate::services::buffer::common::ProjectKeyPair; +use crate::EnvelopeStack; +use hashbrown::HashSet; +use std::future::Future; + +pub mod memory; +pub mod sqlite; + +/// State of the initialization of the [`StackProvider`]. +/// +/// This state is necessary for initializing resources whenever a [`StackProvider`] is used. +#[derive(Debug)] +pub struct InitializationState { + pub project_key_pairs: HashSet, +} + +impl InitializationState { + /// Create a new [`InitializationState`]. + pub fn new(project_key_pairs: HashSet) -> Self { + Self { project_key_pairs } + } + + /// Creates a new empty [`InitializationState`]. + pub fn empty() -> Self { + Self { + project_key_pairs: HashSet::new(), + } + } +} + +/// The creation type for the [`EnvelopeStack`]. +pub enum StackCreationType { + /// An [`EnvelopeStack`] that is created during initialization. + Initialization, + /// An [`EnvelopeStack`] that is created when an envelope is received. + New, +} + +/// A provider of [`EnvelopeStack`] instances that is responsible for creating them. +pub trait StackProvider: std::fmt::Debug { + /// The implementation of [`EnvelopeStack`] that this manager creates. + type Stack: EnvelopeStack; + + /// Initializes the [`StackProvider`]. + fn initialize(&self) -> impl Future; + + /// Creates an [`EnvelopeStack`]. + fn create_stack( + &self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + ) -> Self::Stack; + + /// Returns `true` if the store used by this [`StackProvider`] has space to add new + /// stacks or items to the stacks. + fn has_store_capacity(&self) -> bool; + + /// Returns the total count of the store used by this [`StackProvider`]. + fn store_total_count(&self) -> impl Future; + + /// Returns the string representation of the stack type offered by this [`StackProvider`]. + fn stack_type<'a>(&self) -> &'a str; + + /// Flushes the supplied [`EnvelopeStack`]s and consumes the [`StackProvider`]. + fn flush( + &mut self, + envelope_stacks: impl IntoIterator, + ) -> impl Future; +} diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs new file mode 100644 index 00000000000..15e4be6c4e7 --- /dev/null +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -0,0 +1,206 @@ +use std::error::Error; + +use relay_config::Config; + +use crate::services::buffer::common::ProjectKeyPair; +use crate::services::buffer::envelope_store::sqlite::{ + SqliteEnvelopeStore, SqliteEnvelopeStoreError, +}; +use crate::services::buffer::stack_provider::{ + InitializationState, StackCreationType, StackProvider, +}; +use crate::statsd::RelayTimers; +use crate::{Envelope, EnvelopeStack, SqliteEnvelopeStack}; + +#[derive(Debug)] +pub struct SqliteStackProvider { + envelope_store: SqliteEnvelopeStore, + disk_batch_size: usize, + max_batches: usize, + max_disk_size: usize, + drain_batch_size: usize, +} + +#[warn(dead_code)] +impl SqliteStackProvider { + /// Creates a new [`SqliteStackProvider`] from the provided [`Config`]. + pub async fn new(config: &Config) -> Result { + let envelope_store = SqliteEnvelopeStore::prepare(config).await?; + Ok(Self { + envelope_store, + disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), + max_batches: config.spool_envelopes_stack_max_batches(), + max_disk_size: config.spool_envelopes_max_disk_size(), + drain_batch_size: config.spool_envelopes_stack_disk_batch_size(), + }) + } + + /// Inserts the supplied [`Envelope`]s in the database. + #[allow(clippy::vec_box)] + async fn drain_many(&mut self, envelopes: Vec>) { + if let Err(error) = self + .envelope_store + .insert_many( + envelopes + .into_iter() + .filter_map(|e| e.as_ref().try_into().ok()), + ) + .await + { + relay_log::error!( + error = &error as &dyn Error, + "failed to drain the envelope stacks, some envelopes might be lost", + ); + } + } + + /// Returns `true` when there might be data residing on disk, `false` otherwise. + fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool { + matches!(stack_creation_type, StackCreationType::Initialization) + } +} + +impl StackProvider for SqliteStackProvider { + type Stack = SqliteEnvelopeStack; + + async fn initialize(&self) -> InitializationState { + match self.envelope_store.project_key_pairs().await { + Ok(project_key_pairs) => InitializationState::new(project_key_pairs), + Err(error) => { + relay_log::error!( + error = &error as &dyn Error, + "failed to initialize the sqlite stack provider" + ); + InitializationState::empty() + } + } + } + + fn create_stack( + &self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + ) -> Self::Stack { + SqliteEnvelopeStack::new( + self.envelope_store.clone(), + self.disk_batch_size, + self.max_batches, + project_key_pair.own_key, + project_key_pair.sampling_key, + // We want to check the disk by default if we are creating the stack for the first time, + // since we might have some data on disk. + // On the other hand, if we are recreating a stack, it means that we popped it because + // it was empty, or we never had data on disk for that stack, so we assume by default + // that there is no need to check disk until some data is spooled. + Self::assume_data_on_disk(stack_creation_type), + ) + } + + fn has_store_capacity(&self) -> bool { + (self.envelope_store.usage() as usize) < self.max_disk_size + } + + async fn store_total_count(&self) -> u64 { + self.envelope_store + .total_count() + .await + .unwrap_or_else(|error| { + relay_log::error!( + error = &error as &dyn Error, + "failed to get the total count of envelopes for the sqlite envelope store", + ); + // In case we have an error, we default to communicating a total count of 0. + 0 + }) + } + + fn stack_type<'a>(&self) -> &'a str { + "sqlite" + } + + async fn flush(&mut self, envelope_stacks: impl IntoIterator) { + relay_log::trace!("Flushing sqlite envelope buffer"); + + relay_statsd::metric!(timer(RelayTimers::BufferDrain), { + let mut envelopes = Vec::with_capacity(self.drain_batch_size); + for envelope_stack in envelope_stacks { + for envelope in envelope_stack.flush() { + if envelopes.len() >= self.drain_batch_size { + self.drain_many(envelopes).await; + envelopes = Vec::with_capacity(self.drain_batch_size); + } + + envelopes.push(envelope); + } + } + + if !envelopes.is_empty() { + self.drain_many(envelopes).await; + } + }); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use relay_base_schema::project::ProjectKey; + use relay_config::Config; + use uuid::Uuid; + + use crate::services::buffer::common::ProjectKeyPair; + use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; + use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; + use crate::services::buffer::testutils::utils::mock_envelopes; + use crate::EnvelopeStack; + + fn mock_config() -> Arc { + let path = std::env::temp_dir() + .join(Uuid::new_v4().to_string()) + .into_os_string() + .into_string() + .unwrap(); + + Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "path": path, + "disk_batch_size": 100, + "max_batches": 1, + } + } + })) + .unwrap() + .into() + } + + #[tokio::test] + async fn test_flush() { + let config = mock_config(); + let mut stack_provider = SqliteStackProvider::new(&config).await.unwrap(); + + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + + let mut envelope_stack = stack_provider.create_stack( + StackCreationType::New, + ProjectKeyPair::new(own_key, sampling_key), + ); + + let envelopes = mock_envelopes(10); + for envelope in envelopes { + envelope_stack.push(envelope).await.unwrap(); + } + + let envelope_store = stack_provider.envelope_store.clone(); + + // We make sure that no data is on disk since we will spool when more than 100 elements are + // in the in-memory stack. + assert_eq!(envelope_store.total_count().await.unwrap(), 0); + + // We drain the stack provider, and we expect all in-memory envelopes to be spooled to disk. + stack_provider.flush(vec![envelope_stack]).await; + assert_eq!(envelope_store.total_count().await.unwrap(), 10); + } +} diff --git a/relay-server/src/services/buffer/testutils.rs b/relay-server/src/services/buffer/testutils.rs index 41fb750b934..277bbe58f58 100644 --- a/relay-server/src/services/buffer/testutils.rs +++ b/relay-server/src/services/buffer/testutils.rs @@ -55,15 +55,13 @@ pub mod utils { RequestMeta::new(dsn) } - pub fn mock_envelope(instant: Instant, sampling_key: Option) -> Box { + pub fn mock_envelope(instant: Instant) -> Box { let event_id = EventId::new(); let mut envelope = Envelope::from_request(Some(event_id), request_meta()); - let public_key = - sampling_key.unwrap_or(ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap()); let dsc = DynamicSamplingContext { trace_id: Uuid::new_v4(), - public_key, + public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), release: Some("1.1.1".to_string()), user: Default::default(), replay_id: None, @@ -86,23 +84,7 @@ pub mod utils { pub fn mock_envelopes(count: usize) -> Vec> { let instant = Instant::now(); (0..count) - .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64), None)) - .collect() - } - - #[allow(clippy::vec_box)] - pub fn mock_envelopes_for_project( - count: usize, - sampling_key: ProjectKey, - ) -> Vec> { - let instant = Instant::now(); - (0..count) - .map(|i| { - mock_envelope( - instant - Duration::from_secs((count - i) as u64), - Some(sampling_key), - ) - }) + .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) .collect() } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index fb7489261e5..fbae91d7790 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -181,8 +181,6 @@ pub enum RelayHistograms { /// Number of envelopes in the backpressure buffer between the envelope buffer /// and the project cache. BufferBackpressureEnvelopesCount, - /// Number of envelopes in the buffer per key pair of projects. - BufferInMemoryEnvelopesPerKeyPair, /// The number of batches emitted per partition. BatchesPerPartition, /// The number of buckets in a batch emitted. @@ -311,9 +309,6 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::BufferBackpressureEnvelopesCount => { "buffer.backpressure_envelopes_count" } - RelayHistograms::BufferInMemoryEnvelopesPerKeyPair => { - "buffer.in_memory_envelopes_per_key_pair" - } RelayHistograms::ProjectStatePending => "project_state.pending", RelayHistograms::ProjectStateAttempts => "project_state.attempts", RelayHistograms::ProjectStateRequestBatchSize => "project_state.request.batch_size", @@ -541,8 +536,8 @@ pub enum RelayTimers { BufferPeek, /// Timing in milliseconds for the time it takes for the buffer to pop. BufferPop, - /// Timing in milliseconds for the time it takes for the buffer to flush its envelopes. - BufferFlush, + /// Timing in milliseconds for the time it takes for the buffer to drain its envelopes. + BufferDrain, } impl TimerMetric for RelayTimers { @@ -590,7 +585,7 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferPush => "buffer.push.duration", RelayTimers::BufferPeek => "buffer.peek.duration", RelayTimers::BufferPop => "buffer.pop.duration", - RelayTimers::BufferFlush => "buffer.flush.duration", + RelayTimers::BufferDrain => "buffer.drain.duration", } } } @@ -641,7 +636,8 @@ pub enum RelayCounters { /// This happens when the envelope buffer falsely assumes that the envelope's projects are loaded /// in the cache and sends the envelope onward, even though the project cache cannot handle it. BufferEnvelopesReturned, - /// Number of times a project key pair is popped from the envelope provider. + /// Number of times an envelope stack is popped from the priority queue of stacks in the + /// envelope buffer. BufferEnvelopeStacksPopped, /// Number of times an envelope from the buffer is trying to be popped. BufferTryPop, diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index 5d5a7f87c22..69063a371f2 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -188,7 +188,7 @@ def test_readiness_disk_spool(mini_sentry, relay): # Second sent event can trigger error on the relay size, since the spool is full now. for _ in range(20): - # It takes ~10 events to make SQLite use more pages. + # It takes ~10 events to make SQLlite use more pages. try: relay.send_event(project_key) except HTTPError as e: