Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto chain trimming #2804

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::appender::Appender;
use crate::background_appender::BackgroundAppender;
use crate::loglet::{LogletProvider, OperationError};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::watchdog::{WatchdogCommand, WatchdogSender};
use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result};

/// The strategy to use when bifrost fails to append or when it observes
Expand Down Expand Up @@ -437,7 +437,7 @@ impl BifrostInner {
}
}

async fn get_trim_point(&self, log_id: LogId) -> Result<Lsn, Error> {
pub async fn get_trim_point(&self, log_id: LogId) -> Result<Lsn, Error> {
let log_metadata = Metadata::with_current(|m| m.logs_ref());

let log_chain = log_metadata
Expand All @@ -447,9 +447,6 @@ impl BifrostInner {
let mut trim_point = None;

// Iterate over the chain until we find the first missing trim point, return value before
// todo: maybe update configuration to remember trim point for the whole chain
// todo: support multiple segments.
// todo: dispatch loglet deletion in the background when entire segments are trimmed
for segment in log_chain.iter() {
let loglet = self.get_loglet(log_id, segment).await?;
let loglet_specific_trim_point = loglet.get_trim_point().await?;
Expand Down Expand Up @@ -481,7 +478,12 @@ impl BifrostInner {

loglet.trim(trim_point).await?;
}
// todo: Update logs configuration to remove sealed and empty loglets
// todo: maybe update configuration to remember trim point for the whole chain
// it's okay if the watchdog is dead
let _ = self.watchdog.send(WatchdogCommand::LogTrimmed {
log_id,
requested_trim_point: trim_point,
});
Ok(())
}

Expand Down
96 changes: 93 additions & 3 deletions crates/bifrost/src/watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ use std::time::Duration;

use anyhow::Context;
use enum_map::Enum;
use restate_core::{TaskCenter, TaskKind, cancellation_watcher};
use restate_types::logs::metadata::ProviderKind;
use tokio::task::JoinSet;
use tracing::{debug, trace, warn};

use restate_core::metadata_store::{ReadWriteError, retry_on_retryable_error};
use restate_core::{TaskCenter, TaskKind, cancellation_watcher};
use restate_metadata_server::ReadModifyWriteError;
use restate_types::config::Configuration;
use restate_types::logs::metadata::{Logs, ProviderKind};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;

use crate::bifrost::BifrostInner;
use crate::loglet::LogletProvider;

Expand Down Expand Up @@ -68,7 +74,6 @@ impl Watchdog {

WatchdogCommand::WatchProvider(provider) => {
self.live_providers.push(provider.clone());
// TODO: Convert to a managed background task
let _ = TaskCenter::spawn(
TaskKind::BifrostBackgroundHighPriority,
"bifrost-provider-on-start",
Expand All @@ -78,9 +83,45 @@ impl Watchdog {
},
);
}
WatchdogCommand::LogTrimmed {
log_id,
requested_trim_point,
} => {
self.on_log_trim(log_id, requested_trim_point);
}
}
}

fn on_log_trim(&self, log_id: LogId, requested_trim_point: Lsn) {
let bifrost = self.inner.clone();
let _ = TaskCenter::spawn_child(
TaskKind::BifrostBackgroundLowPriority,
format!("trim-chain-{log_id}"),
async move {
let trim_point = bifrost
.get_trim_point(log_id)
.await
.context("cannot determine tail")?;
if trim_point == Lsn::INVALID {
return Ok(());
}
// todo(asoli): Notify providers about trimmed loglets for pruning.
let opts = Configuration::pinned();
retry_on_retryable_error(opts.common.network_error_retry_policy.clone(), || {
trim_chain_if_needed(&bifrost, log_id, requested_trim_point, trim_point)
})
.await
.with_context(|| {
format!(
"failed trimming the log chain after trimming log {} to trim-point {}",
log_id, trim_point
)
})?;
Ok(())
},
);
}

pub fn sender(&self) -> WatchdogSender {
self.sender.clone()
}
Expand Down Expand Up @@ -152,10 +193,59 @@ impl Watchdog {
}
}

#[derive(Debug)]
struct AlreadyTrimmed;

async fn trim_chain_if_needed(
bifrost: &BifrostInner,
log_id: LogId,
requested_trim_point: Lsn,
actual_trim_point: Lsn,
) -> Result<(), ReadWriteError> {
let new_logs = bifrost
.metadata_writer
.metadata_store_client()
.read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option<Logs>| {
let logs = current.expect("logs should be initialized by BifrostService");
let mut logs_builder = logs.into_builder();
let mut chain_builder = logs_builder.chain(log_id).expect("log id exists");

// trim_prefix's lsn is exclusive. Trim-point is inclusive of the last trimmed lsn,
// therefore, we need to trim _including_ the trim point.
chain_builder.trim_prefix(actual_trim_point.next());
let Some(logs) = logs_builder.build_if_modified() else {
// already trimmed, nothing to be done.
return Err(AlreadyTrimmed);
};
Ok(logs)
})
.await;
match new_logs {
Ok(logs) => {
bifrost.metadata_writer.submit(Arc::new(logs));
debug!(
"Log {} chain has been trimmed to trim-point {} after requesting trim to {}",
log_id, actual_trim_point, requested_trim_point,
);
}
Err(ReadModifyWriteError::FailedOperation(AlreadyTrimmed)) => {
// nothing to do
}
Err(ReadModifyWriteError::ReadWrite(err)) => return Err(err),
};
Ok(())
}

pub enum WatchdogCommand {
/// Request to sync metadata if the client believes that it's outdated.
/// i.e. attempting to write to a sealed segment.
#[allow(dead_code)]
ScheduleMetadataSync,
WatchProvider(Arc<dyn LogletProvider>),
LogTrimmed {
log_id: LogId,
/// NOTE: This is **not** the actual trim point, this could easily be Lsn::MAX (legal)
/// Only used for logging, never use this value as an authoritative trim-point.
requested_trim_point: Lsn,
},
}
2 changes: 1 addition & 1 deletion crates/core/src/task_center/task_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub enum TaskKind {
/// shutdown on errors and the system will wait for its graceful cancellation on shutdown.
#[strum(props(runtime = "default"))]
BifrostBackgroundHighPriority,
#[strum(props(OnCancel = "abort", runtime = "default"))]
#[strum(props(OnCancel = "abort", runtime = "default", OnError = "log"))]
BifrostBackgroundLowPriority,
/// A background appender. The task will log on errors but the system will wait for its
/// graceful cancellation on shutdown.
Expand Down
Loading