Skip to content

Commit

Permalink
ref(project-cache): Schedule updates instead of spawning tasks (#4233)
Browse files Browse the repository at this point in the history
We can do better than spawning a large amount of tasks which just wait
by using a heap to keep track of when the next task is scheduled and
`FuturesUnordered` to wait for the results from the tasks. Each task is
dispatched to a `ProjectSource` which is very heavily IO bound which and
we don't need parallelism for.
  • Loading branch information
Dav1dde authored Nov 11, 2024
1 parent 726be3f commit 403f2bf
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 30 deletions.
2 changes: 1 addition & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ chrono = { workspace = true, features = ["clock"] }
data-encoding = { workspace = true }
flate2 = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
futures = { workspace = true, features = ["async-await"] }
hashbrown = { workspace = true }
hyper-util = { workspace = true }
itertools = { workspace = true }
Expand Down
37 changes: 20 additions & 17 deletions relay-server/src/services/projects/cache/service.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::StreamExt as _;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_statsd::metric;
use relay_system::Service;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast;

use crate::services::projects::cache::handle::ProjectCacheHandle;
use crate::services::projects::cache::state::{CompletedFetch, Fetch, ProjectStore};
use crate::services::projects::project::ProjectState;
use crate::services::projects::source::ProjectSource;
use crate::statsd::{RelayGauges, RelayTimers};
use crate::utils::FuturesScheduled;

/// Size of the broadcast channel for project events.
///
Expand Down Expand Up @@ -66,24 +69,21 @@ pub struct ProjectCacheService {
source: ProjectSource,
config: Arc<Config>,

project_update_rx: mpsc::UnboundedReceiver<CompletedFetch>,
project_update_tx: mpsc::UnboundedSender<CompletedFetch>,
scheduled_fetches: FuturesScheduled<BoxFuture<'static, CompletedFetch>>,

project_events_tx: broadcast::Sender<ProjectChange>,
}

impl ProjectCacheService {
/// Creates a new [`ProjectCacheService`].
pub fn new(config: Arc<Config>, source: ProjectSource) -> Self {
let (project_update_tx, project_update_rx) = mpsc::unbounded_channel();
let project_events_tx = broadcast::channel(PROJECT_EVENTS_CHANNEL_SIZE).0;

Self {
store: ProjectStore::default(),
source,
config,
project_update_rx,
project_update_tx,
scheduled_fetches: FuturesScheduled::default(),
project_events_tx,
}
}
Expand All @@ -106,14 +106,12 @@ impl ProjectCacheService {
handle
}

/// Schedules a new [`Fetch`] and delivers the result to the [`Self::project_update_tx`] channel.
fn schedule_fetch(&self, fetch: Fetch) {
/// Schedules a new [`Fetch`] in [`Self::scheduled_fetches`].
fn schedule_fetch(&mut self, fetch: Fetch) {
let source = self.source.clone();
let project_updates = self.project_update_tx.clone();

tokio::spawn(async move {
tokio::time::sleep_until(fetch.when()).await;

let when = fetch.when();
let task = async move {
let state = match source
.fetch(fetch.project_key(), false, fetch.revision())
.await
Expand All @@ -132,8 +130,13 @@ impl ProjectCacheService {
}
};

let _ = project_updates.send(fetch.complete(state));
});
fetch.complete(state)
};
self.scheduled_fetches.schedule(when, Box::pin(task));

metric!(
gauge(RelayGauges::ProjectCacheScheduledFetches) = self.scheduled_fetches.len() as u64
);
}
}

Expand Down Expand Up @@ -207,9 +210,9 @@ impl relay_system::Service for ProjectCacheService {
tokio::select! {
biased;

Some(update) = self.project_update_rx.recv() => timed!(
"project_update",
self.handle_completed_fetch(update)
Some(fetch) = self.scheduled_fetches.next() => timed!(
"completed_fetch",
self.handle_completed_fetch(fetch)
),
Some(message) = rx.recv() => timed!(
message.variant(),
Expand Down
27 changes: 15 additions & 12 deletions relay-server/src/services/projects/cache/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl ProjectRef<'_> {
#[derive(Debug)]
pub struct Fetch {
project_key: ProjectKey,
when: Instant,
when: Option<Instant>,
revision: Revision,
}

Expand All @@ -290,9 +290,9 @@ impl Fetch {

/// Returns when the fetch for the project should be scheduled.
///
/// This can be now (as soon as possible) or a later point in time, if the project is currently
/// in a backoff.
pub fn when(&self) -> Instant {
/// This can be now (as soon as possible, indicated by `None`) or a later point in time,
/// if the project is currently in a backoff.
pub fn when(&self) -> Option<Instant> {
self.when
}

Expand Down Expand Up @@ -462,7 +462,7 @@ impl PrivateProjectState {
}
FetchState::Pending { next_fetch_attempt } => {
// Schedule a new fetch, even if there is a backoff, it will just be sleeping for a while.
next_fetch_attempt.unwrap_or(now)
*next_fetch_attempt
}
FetchState::Complete { last_fetch } => {
if last_fetch.check_expiry(now, config).is_fresh() {
Expand All @@ -473,7 +473,7 @@ impl PrivateProjectState {
);
return None;
}
now
None
}
};

Expand All @@ -484,7 +484,7 @@ impl PrivateProjectState {
tags.project_key = &self.project_key.as_str(),
attempts = self.backoff.attempt() + 1,
"project state fetch scheduled in {:?}",
when.saturating_duration_since(Instant::now()),
when.unwrap_or(now).saturating_duration_since(now),
);

Some(Fetch {
Expand All @@ -501,9 +501,12 @@ impl PrivateProjectState {
);

if fetch.is_pending() {
self.state = FetchState::Pending {
next_fetch_attempt: now.checked_add(self.backoff.next_backoff()),
let next_backoff = self.backoff.next_backoff();
let next_fetch_attempt = match next_backoff.is_zero() {
false => now.checked_add(next_backoff),
true => None,
};
self.state = FetchState::Pending { next_fetch_attempt };
relay_log::trace!(
tags.project_key = &self.project_key.as_str(),
"project state fetch completed but still pending"
Expand Down Expand Up @@ -596,7 +599,7 @@ mod tests {

let fetch = store.try_begin_fetch(project_key, &config).unwrap();
assert_eq!(fetch.project_key(), project_key);
assert!(fetch.when() < Instant::now());
assert_eq!(fetch.when(), None);
assert_eq!(fetch.revision().as_str(), None);
assert_state!(store, project_key, ProjectState::Pending);

Expand All @@ -608,7 +611,7 @@ mod tests {
let fetch = store.complete_fetch(fetch, &config).unwrap();
assert_eq!(fetch.project_key(), project_key);
// First backoff is still immediately.
assert!(fetch.when() < Instant::now());
assert_eq!(fetch.when(), None);
assert_eq!(fetch.revision().as_str(), None);
assert_state!(store, project_key, ProjectState::Pending);

Expand All @@ -617,7 +620,7 @@ mod tests {
let fetch = store.complete_fetch(fetch, &config).unwrap();
assert_eq!(fetch.project_key(), project_key);
// This time it needs to be in the future (backoff).
assert!(fetch.when() > Instant::now());
assert!(fetch.when() > Some(Instant::now()));
assert_eq!(fetch.revision().as_str(), None);
assert_state!(store, project_key, ProjectState::Pending);

Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum RelayGauges {
RedisPoolIdleConnections,
/// The number of notifications in the broadcast channel of the project cache.
ProjectCacheNotificationChannel,
/// The number of scheduled and in progress fetches in the project cache.
ProjectCacheScheduledFetches,
/// Exposes the amount of currently open and handled connections by the server.
ServerActiveConnections,
}
Expand All @@ -68,6 +70,7 @@ impl GaugeMetric for RelayGauges {
RelayGauges::ProjectCacheNotificationChannel => {
"project_cache.notification_channel.size"
}
RelayGauges::ProjectCacheScheduledFetches => "project_cache.fetches.size",
RelayGauges::ServerActiveConnections => "server.http.connections",
}
}
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod param_parser;
mod pick;
mod rate_limits;
mod retry;
mod scheduled;
mod sizes;
mod sleep_handle;
mod split_off;
Expand All @@ -30,6 +31,7 @@ pub use self::param_parser::*;
pub use self::pick::*;
pub use self::rate_limits::*;
pub use self::retry::*;
pub use self::scheduled::*;
pub use self::serde::*;
pub use self::sizes::*;
pub use self::sleep_handle::*;
Expand Down
Loading

0 comments on commit 403f2bf

Please sign in to comment.