diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index a3ebcf377..228b71981 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -26,7 +26,7 @@ use crate::appender::Appender; use crate::background_appender::BackgroundAppender; use crate::loglet::{LogletProvider, OperationError}; use crate::loglet_wrapper::LogletWrapper; -use crate::watchdog::WatchdogSender; +use crate::watchdog::{WatchdogCommand, WatchdogSender}; use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes @@ -437,7 +437,7 @@ impl BifrostInner { } } - async fn get_trim_point(&self, log_id: LogId) -> Result { + pub async fn get_trim_point(&self, log_id: LogId) -> Result { let log_metadata = Metadata::with_current(|m| m.logs_ref()); let log_chain = log_metadata @@ -447,9 +447,6 @@ impl BifrostInner { let mut trim_point = None; // Iterate over the chain until we find the first missing trim point, return value before - // todo: maybe update configuration to remember trim point for the whole chain - // todo: support multiple segments. - // todo: dispatch loglet deletion in the background when entire segments are trimmed for segment in log_chain.iter() { let loglet = self.get_loglet(log_id, segment).await?; let loglet_specific_trim_point = loglet.get_trim_point().await?; @@ -481,7 +478,12 @@ impl BifrostInner { loglet.trim(trim_point).await?; } - // todo: Update logs configuration to remove sealed and empty loglets + // todo: maybe update configuration to remember trim point for the whole chain + // it's okay if the watchdog is dead + let _ = self.watchdog.send(WatchdogCommand::LogTrimmed { + log_id, + requested_trim_point: trim_point, + }); Ok(()) } diff --git a/crates/bifrost/src/watchdog.rs b/crates/bifrost/src/watchdog.rs index 975e03505..27c7ccf2f 100644 --- a/crates/bifrost/src/watchdog.rs +++ b/crates/bifrost/src/watchdog.rs @@ -13,11 +13,17 @@ use std::time::Duration; use anyhow::Context; use enum_map::Enum; -use restate_core::{TaskCenter, TaskKind, cancellation_watcher}; -use restate_types::logs::metadata::ProviderKind; use tokio::task::JoinSet; use tracing::{debug, trace, warn}; +use restate_core::metadata_store::{ReadWriteError, retry_on_retryable_error}; +use restate_core::{TaskCenter, TaskKind, cancellation_watcher}; +use restate_metadata_server::ReadModifyWriteError; +use restate_types::config::Configuration; +use restate_types::logs::metadata::{Logs, ProviderKind}; +use restate_types::logs::{LogId, Lsn, SequenceNumber}; +use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; + use crate::bifrost::BifrostInner; use crate::loglet::LogletProvider; @@ -68,7 +74,6 @@ impl Watchdog { WatchdogCommand::WatchProvider(provider) => { self.live_providers.push(provider.clone()); - // TODO: Convert to a managed background task let _ = TaskCenter::spawn( TaskKind::BifrostBackgroundHighPriority, "bifrost-provider-on-start", @@ -78,9 +83,45 @@ impl Watchdog { }, ); } + WatchdogCommand::LogTrimmed { + log_id, + requested_trim_point, + } => { + self.on_log_trim(log_id, requested_trim_point); + } } } + fn on_log_trim(&self, log_id: LogId, requested_trim_point: Lsn) { + let bifrost = self.inner.clone(); + let _ = TaskCenter::spawn_child( + TaskKind::BifrostBackgroundLowPriority, + format!("trim-chain-{log_id}"), + async move { + let trim_point = bifrost + .get_trim_point(log_id) + .await + .context("cannot determine tail")?; + if trim_point == Lsn::INVALID { + return Ok(()); + } + // todo(asoli): Notify providers about trimmed loglets for pruning. + let opts = Configuration::pinned(); + retry_on_retryable_error(opts.common.network_error_retry_policy.clone(), || { + trim_chain_if_needed(&bifrost, log_id, requested_trim_point, trim_point) + }) + .await + .with_context(|| { + format!( + "failed trimming the log chain after trimming log {} to trim-point {}", + log_id, trim_point + ) + })?; + Ok(()) + }, + ); + } + pub fn sender(&self) -> WatchdogSender { self.sender.clone() } @@ -152,10 +193,59 @@ impl Watchdog { } } +#[derive(Debug)] +struct AlreadyTrimmed; + +async fn trim_chain_if_needed( + bifrost: &BifrostInner, + log_id: LogId, + requested_trim_point: Lsn, + actual_trim_point: Lsn, +) -> Result<(), ReadWriteError> { + let new_logs = bifrost + .metadata_writer + .metadata_store_client() + .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { + let logs = current.expect("logs should be initialized by BifrostService"); + let mut logs_builder = logs.into_builder(); + let mut chain_builder = logs_builder.chain(log_id).expect("log id exists"); + + // trim_prefix's lsn is exclusive. Trim-point is inclusive of the last trimmed lsn, + // therefore, we need to trim _including_ the trim point. + chain_builder.trim_prefix(actual_trim_point.next()); + let Some(logs) = logs_builder.build_if_modified() else { + // already trimmed, nothing to be done. + return Err(AlreadyTrimmed); + }; + Ok(logs) + }) + .await; + match new_logs { + Ok(logs) => { + bifrost.metadata_writer.submit(Arc::new(logs)); + debug!( + "Log {} chain has been trimmed to trim-point {} after requesting trim to {}", + log_id, actual_trim_point, requested_trim_point, + ); + } + Err(ReadModifyWriteError::FailedOperation(AlreadyTrimmed)) => { + // nothing to do + } + Err(ReadModifyWriteError::ReadWrite(err)) => return Err(err), + }; + Ok(()) +} + pub enum WatchdogCommand { /// Request to sync metadata if the client believes that it's outdated. /// i.e. attempting to write to a sealed segment. #[allow(dead_code)] ScheduleMetadataSync, WatchProvider(Arc), + LogTrimmed { + log_id: LogId, + /// NOTE: This is **not** the actual trim point, this could easily be Lsn::MAX (legal) + /// Only used for logging, never use this value as an authoritative trim-point. + requested_trim_point: Lsn, + }, } diff --git a/crates/core/src/task_center/task_kind.rs b/crates/core/src/task_center/task_kind.rs index bfb1b5c51..13df1cd5d 100644 --- a/crates/core/src/task_center/task_kind.rs +++ b/crates/core/src/task_center/task_kind.rs @@ -116,7 +116,7 @@ pub enum TaskKind { /// shutdown on errors and the system will wait for its graceful cancellation on shutdown. #[strum(props(runtime = "default"))] BifrostBackgroundHighPriority, - #[strum(props(OnCancel = "abort", runtime = "default"))] + #[strum(props(OnCancel = "abort", runtime = "default", OnError = "log"))] BifrostBackgroundLowPriority, /// A background appender. The task will log on errors but the system will wait for its /// graceful cancellation on shutdown.