Skip to content

Commit

Permalink
WIP: Introduce FindTailAttr
Browse files Browse the repository at this point in the history
Summary:
An implementation dependend find-tail-attr
  • Loading branch information
muhamadazmy committed Feb 25, 2025
1 parent 5d2020f commit ab3d293
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 76 deletions.
3 changes: 2 additions & 1 deletion crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use bytes::{Bytes, BytesMut};
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

use restate_bifrost::loglet::FindTailAttr;
use restate_bifrost::{Bifrost, Error as BiforstError};
use restate_core::{Metadata, MetadataWriter};
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -279,7 +280,7 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
})?;

let tail_state = writable_loglet
.find_tail()
.find_tail(FindTailAttr::default())
.await
.map_err(|err| Status::internal(err.to_string()))?;

Expand Down
3 changes: 2 additions & 1 deletion crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, enabled, error, info, trace_span, Instrument, Level};

use restate_bifrost::loglet::FindTailAttr;
use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{Precondition, WriteError};
use restate_core::{
Expand Down Expand Up @@ -1095,7 +1096,7 @@ impl LogsController {
}

debug!(%log_id, segment_index=%writeable_loglet.segment_index(), "Attempting to find tail for loglet");
let found_tail = match writeable_loglet.find_tail().await {
let found_tail = match writeable_loglet.find_tail(FindTailAttr::Durable).await {
Ok(tail) => tail,
Err(err) => {
debug!(error=%err, %log_id, segment_index=%writeable_loglet.segment_index(), "Failed to find tail for loglet");
Expand Down
36 changes: 31 additions & 5 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ mod tests {
use tracing::{debug, info, warn};

use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher;
use restate_bifrost::loglet::FindTailAttr;
use restate_bifrost::providers::memory_loglet;
use restate_bifrost::{Bifrost, BifrostService, ErrorRecoveryStrategy};
use restate_core::network::{
Expand Down Expand Up @@ -980,7 +981,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);

Expand Down Expand Up @@ -1057,7 +1063,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);
tokio::time::sleep(interval_duration * 2).await;
Expand Down Expand Up @@ -1134,7 +1145,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);
tokio::time::sleep(interval_duration * 10).await;
Expand Down Expand Up @@ -1214,7 +1230,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_persisted_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);

Expand Down Expand Up @@ -1292,7 +1313,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);
tokio::time::sleep(interval_duration * 2).await;
Expand Down
63 changes: 44 additions & 19 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_types::storage::StorageEncode;

use crate::appender::Appender;
use crate::background_appender::BackgroundAppender;
use crate::loglet::{LogletProvider, OperationError};
use crate::loglet::{FindTailAttr, LogletProvider, OperationError};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result};
Expand Down Expand Up @@ -198,14 +198,15 @@ impl Bifrost {
///
/// ```no_run
/// use restate_bifrost::{Bifrost, LogReadStream};
/// use restate_bifrost::loglet::FindTailAttr;
/// use restate_types::logs::{KeyFilter, LogId, SequenceNumber};
///
/// async fn reader(bifrost: &Bifrost, log_id: LogId) -> LogReadStream {
/// bifrost.create_reader(
/// log_id,
/// KeyFilter::Any,
/// bifrost.get_trim_point(log_id).await.unwrap(),
/// bifrost.find_tail(log_id).await.unwrap().offset().prev(),
/// bifrost.find_tail(log_id, FindTailAttr::default()).await.unwrap().offset().prev(),
/// ).unwrap()
/// }
/// ```
Expand Down Expand Up @@ -261,16 +262,22 @@ impl Bifrost {
/// If the log is empty, it returns TailState::Open(Lsn::OLDEST).
/// This should never return Err(Error::LogSealed). Sealed state is represented as
/// TailState::Sealed(..)
pub async fn find_tail(&self, log_id: LogId) -> Result<TailState> {
pub async fn find_tail(&self, log_id: LogId, attrs: FindTailAttr) -> Result<TailState> {
self.inner.fail_if_shutting_down()?;
Ok(self.inner.find_tail(log_id).await?.1)
Ok(self.inner.find_tail(log_id, attrs).await?.1)
}

// Get the loglet currently serving the tail of the chain, for use in integration tests.
#[cfg(any(test, feature = "test-util"))]
pub async fn find_tail_loglet(&self, log_id: LogId) -> Result<Arc<dyn crate::loglet::Loglet>> {
self.inner.fail_if_shutting_down()?;
Ok(self.inner.find_tail(log_id).await?.0.inner().clone())
Ok(self
.inner
.find_tail(log_id, FindTailAttr::default())
.await?
.0
.inner()
.clone())
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
Expand All @@ -290,7 +297,7 @@ impl Bifrost {

self.inner.fail_if_shutting_down()?;

let current_tail = self.find_tail(log_id).await?;
let current_tail = self.find_tail(log_id, FindTailAttr::default()).await?;

if current_tail.offset() <= Lsn::OLDEST {
return Ok(Vec::default());
Expand Down Expand Up @@ -368,7 +375,7 @@ impl BifrostInner {
from: Lsn,
) -> Result<Option<crate::LogEntry>> {
use futures::StreamExt;
let (_, tail_state) = self.find_tail(log_id).await?;
let (_, tail_state) = self.find_tail(log_id, FindTailAttr::default()).await?;
if from >= tail_state.offset() {
// Can't use this function to read future records.
return Ok(None);
Expand All @@ -385,7 +392,11 @@ impl BifrostInner {
stream.next().await.transpose()
}

pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> {
pub async fn find_tail(
&self,
log_id: LogId,
attr: FindTailAttr,
) -> Result<(LogletWrapper, TailState)> {
let loglet = self.writeable_loglet(log_id).await?;
let start = Instant::now();
// uses the same retry policy as reads to not add too many configuration keys
Expand All @@ -396,7 +407,7 @@ impl BifrostInner {
.clone()
.into_iter();
loop {
match loglet.find_tail().await {
match loglet.find_tail(attr).await {
Ok(tail) => {
if logged {
info!(
Expand Down Expand Up @@ -680,7 +691,9 @@ mod tests {
assert_eq!(max_lsn + Lsn::from(1), lsn);
max_lsn = lsn;

let tail = bifrost.find_tail(LogId::new(0)).await?;
let tail = bifrost
.find_tail(LogId::new(0), FindTailAttr::default())
.await?;
assert_eq!(max_lsn.next(), tail.offset());

// Initiate shutdown
Expand Down Expand Up @@ -725,7 +738,13 @@ mod tests {

let bifrost = Bifrost::init_local(node_env.metadata_writer).await;

assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset());
assert_eq!(
Lsn::OLDEST,
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
);

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Expand All @@ -737,7 +756,7 @@ mod tests {

bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?;

let tail = bifrost.find_tail(LOG_ID).await?;
let tail = bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?;
assert_eq!(tail.offset(), Lsn::from(11));
assert!(!tail.is_sealed());
assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?);
Expand All @@ -759,7 +778,13 @@ mod tests {
// trimming beyond the release point will fall back to the release point
bifrost.admin().trim(LOG_ID, Lsn::MAX).await?;

assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
assert_eq!(
Lsn::from(11),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
);
let new_trim_point = bifrost.get_trim_point(LOG_ID).await?;
assert_eq!(Lsn::from(10), new_trim_point);

Expand Down Expand Up @@ -803,7 +828,7 @@ mod tests {

// not sealed, tail is what we expect
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(6))))
);

Expand All @@ -821,7 +846,7 @@ mod tests {

// sealed, tail is what we expect
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

Expand Down Expand Up @@ -873,7 +898,7 @@ mod tests {
// find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after
// the new segment was created at tail of the chain with base_lsn=5
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(5))))
);

Expand All @@ -888,13 +913,13 @@ mod tests {

// tail is now 8 and open.
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

// validating that segment 1 is still sealed and has its own tail at Lsn (6)
assert_that!(
segment_1.find_tail().await?,
segment_1.find_tail(FindTailAttr::default()).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

Expand All @@ -908,7 +933,7 @@ mod tests {

// segment 2 is open and at 8 as previously validated through bifrost interface
assert_that!(
segment_2.find_tail().await?,
segment_2.find_tail(FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

Expand Down
3 changes: 2 additions & 1 deletion crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use restate_types::Version;

use crate::bifrost::BifrostInner;
use crate::error::AdminError;
use crate::loglet::FindTailAttr;
use crate::loglet_wrapper::LogletWrapper;
use crate::{Error, Result};

Expand Down Expand Up @@ -187,7 +188,7 @@ impl<'a> BifrostAdmin<'a> {
}
}
}
let tail = loglet.find_tail().await?;
let tail = loglet.find_tail(FindTailAttr::default()).await?;

Ok(SealedSegment {
segment_index: loglet.segment_index(),
Expand Down
18 changes: 16 additions & 2 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ pub trait Loglet: Send + Sync {

/// The tail is *the first unwritten position* in the loglet.
///
/// Finds the durable tail of the loglet (last record offset that was durably committed) then
/// Finds the tail of the loglet (last record offset that was durably committed) then
/// it returns the offset **after** it. Virtually, the returned offset is the offset returned
/// after the next `append()` call.
///
/// If the loglet is empty, the loglet should return TailState::Open(Offset::OLDEST).
async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError>;
async fn find_tail(
&self,
attr: FindTailAttr,
) -> Result<TailState<LogletOffset>, OperationError>;

/// The offset of the slot **before** the first readable record (if it exists), or the offset
/// before the next slot that will be written to. Must not return Self::INVALID. If the loglet
Expand All @@ -150,6 +153,17 @@ pub trait Loglet: Send + Sync {
async fn seal(&self) -> Result<(), OperationError>;
}

#[derive(Default, Clone, Copy, PartialEq, Eq)]
pub enum FindTailAttr {
/// Finds the durable tail of the loglet.
#[default]
Durable,
/// Approximate find tail of the loglet. This should cheap to run based on the
/// loglet implementation. If an efficient approximation is not possible, it must
/// fall back to a durable check.
Approximate,
}

/// A stream of log records from a single loglet. Loglet streams are _always_ tailing streams.
pub trait LogletReadStream: Stream<Item = Result<LogEntry<LogletOffset>, OperationError>> {
/// Current read pointer. This points to the next offset to be read.
Expand Down
Loading

0 comments on commit ab3d293

Please sign in to comment.