Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FindTailAttr for configurable find_tail behavior #2781

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use bytes::{Bytes, BytesMut};
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

use restate_bifrost::loglet::FindTailAttr;
use restate_bifrost::{Bifrost, Error as BiforstError};
use restate_core::{Metadata, MetadataWriter};
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -279,7 +280,7 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
})?;

let tail_state = writable_loglet
.find_tail()
.find_tail(FindTailAttr::default())
.await
.map_err(|err| Status::internal(err.to_string()))?;

Expand Down
3 changes: 2 additions & 1 deletion crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, enabled, error, info, trace_span, Instrument, Level};

use restate_bifrost::loglet::FindTailAttr;
use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{Precondition, WriteError};
use restate_core::{
Expand Down Expand Up @@ -1095,7 +1096,7 @@ impl LogsController {
}

debug!(%log_id, segment_index=%writeable_loglet.segment_index(), "Attempting to find tail for loglet");
let found_tail = match writeable_loglet.find_tail().await {
let found_tail = match writeable_loglet.find_tail(FindTailAttr::Durable).await {
Ok(tail) => tail,
Err(err) => {
debug!(error=%err, %log_id, segment_index=%writeable_loglet.segment_index(), "Failed to find tail for loglet");
Expand Down
36 changes: 31 additions & 5 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ mod tests {
use tracing::{debug, info, warn};

use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher;
use restate_bifrost::loglet::FindTailAttr;
use restate_bifrost::providers::memory_loglet;
use restate_bifrost::{Bifrost, BifrostService, ErrorRecoveryStrategy};
use restate_core::network::{
Expand Down Expand Up @@ -980,7 +981,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);

Expand Down Expand Up @@ -1057,7 +1063,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);
tokio::time::sleep(interval_duration * 2).await;
Expand Down Expand Up @@ -1134,7 +1145,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);
tokio::time::sleep(interval_duration * 10).await;
Expand Down Expand Up @@ -1214,7 +1230,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_persisted_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);

Expand Down Expand Up @@ -1292,7 +1313,12 @@ mod tests {
assert_eq!(Lsn::from(i), lsn);
}
applied_lsn.store(
bifrost.find_tail(LOG_ID).await?.offset().prev().as_u64(),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
.prev()
.as_u64(),
Ordering::Relaxed,
);
tokio::time::sleep(interval_duration * 2).await;
Expand Down
63 changes: 44 additions & 19 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_types::storage::StorageEncode;

use crate::appender::Appender;
use crate::background_appender::BackgroundAppender;
use crate::loglet::{LogletProvider, OperationError};
use crate::loglet::{FindTailAttr, LogletProvider, OperationError};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result};
Expand Down Expand Up @@ -198,14 +198,15 @@ impl Bifrost {
///
/// ```no_run
/// use restate_bifrost::{Bifrost, LogReadStream};
/// use restate_bifrost::loglet::FindTailAttr;
/// use restate_types::logs::{KeyFilter, LogId, SequenceNumber};
///
/// async fn reader(bifrost: &Bifrost, log_id: LogId) -> LogReadStream {
/// bifrost.create_reader(
/// log_id,
/// KeyFilter::Any,
/// bifrost.get_trim_point(log_id).await.unwrap(),
/// bifrost.find_tail(log_id).await.unwrap().offset().prev(),
/// bifrost.find_tail(log_id, FindTailAttr::default()).await.unwrap().offset().prev(),
/// ).unwrap()
/// }
/// ```
Expand Down Expand Up @@ -261,16 +262,22 @@ impl Bifrost {
/// If the log is empty, it returns TailState::Open(Lsn::OLDEST).
/// This should never return Err(Error::LogSealed). Sealed state is represented as
/// TailState::Sealed(..)
pub async fn find_tail(&self, log_id: LogId) -> Result<TailState> {
pub async fn find_tail(&self, log_id: LogId, attrs: FindTailAttr) -> Result<TailState> {
self.inner.fail_if_shutting_down()?;
Ok(self.inner.find_tail(log_id).await?.1)
Ok(self.inner.find_tail(log_id, attrs).await?.1)
}

// Get the loglet currently serving the tail of the chain, for use in integration tests.
#[cfg(any(test, feature = "test-util"))]
pub async fn find_tail_loglet(&self, log_id: LogId) -> Result<Arc<dyn crate::loglet::Loglet>> {
self.inner.fail_if_shutting_down()?;
Ok(self.inner.find_tail(log_id).await?.0.inner().clone())
Ok(self
.inner
.find_tail(log_id, FindTailAttr::default())
.await?
.0
.inner()
.clone())
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
Expand All @@ -290,7 +297,7 @@ impl Bifrost {

self.inner.fail_if_shutting_down()?;

let current_tail = self.find_tail(log_id).await?;
let current_tail = self.find_tail(log_id, FindTailAttr::default()).await?;

if current_tail.offset() <= Lsn::OLDEST {
return Ok(Vec::default());
Expand Down Expand Up @@ -368,7 +375,7 @@ impl BifrostInner {
from: Lsn,
) -> Result<Option<crate::LogEntry>> {
use futures::StreamExt;
let (_, tail_state) = self.find_tail(log_id).await?;
let (_, tail_state) = self.find_tail(log_id, FindTailAttr::default()).await?;
if from >= tail_state.offset() {
// Can't use this function to read future records.
return Ok(None);
Expand All @@ -385,7 +392,11 @@ impl BifrostInner {
stream.next().await.transpose()
}

pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> {
pub async fn find_tail(
&self,
log_id: LogId,
attr: FindTailAttr,
) -> Result<(LogletWrapper, TailState)> {
let loglet = self.writeable_loglet(log_id).await?;
let start = Instant::now();
// uses the same retry policy as reads to not add too many configuration keys
Expand All @@ -396,7 +407,7 @@ impl BifrostInner {
.clone()
.into_iter();
loop {
match loglet.find_tail().await {
match loglet.find_tail(attr).await {
Ok(tail) => {
if logged {
info!(
Expand Down Expand Up @@ -680,7 +691,9 @@ mod tests {
assert_eq!(max_lsn + Lsn::from(1), lsn);
max_lsn = lsn;

let tail = bifrost.find_tail(LogId::new(0)).await?;
let tail = bifrost
.find_tail(LogId::new(0), FindTailAttr::default())
.await?;
assert_eq!(max_lsn.next(), tail.offset());

// Initiate shutdown
Expand Down Expand Up @@ -725,7 +738,13 @@ mod tests {

let bifrost = Bifrost::init_local(node_env.metadata_writer).await;

assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset());
assert_eq!(
Lsn::OLDEST,
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
);

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Expand All @@ -737,7 +756,7 @@ mod tests {

bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?;

let tail = bifrost.find_tail(LOG_ID).await?;
let tail = bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?;
assert_eq!(tail.offset(), Lsn::from(11));
assert!(!tail.is_sealed());
assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?);
Expand All @@ -759,7 +778,13 @@ mod tests {
// trimming beyond the release point will fall back to the release point
bifrost.admin().trim(LOG_ID, Lsn::MAX).await?;

assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
assert_eq!(
Lsn::from(11),
bifrost
.find_tail(LOG_ID, FindTailAttr::default())
.await?
.offset()
);
let new_trim_point = bifrost.get_trim_point(LOG_ID).await?;
assert_eq!(Lsn::from(10), new_trim_point);

Expand Down Expand Up @@ -803,7 +828,7 @@ mod tests {

// not sealed, tail is what we expect
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(6))))
);

Expand All @@ -821,7 +846,7 @@ mod tests {

// sealed, tail is what we expect
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

Expand Down Expand Up @@ -873,7 +898,7 @@ mod tests {
// find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after
// the new segment was created at tail of the chain with base_lsn=5
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(5))))
);

Expand All @@ -888,13 +913,13 @@ mod tests {

// tail is now 8 and open.
assert_that!(
bifrost.find_tail(LOG_ID).await?,
bifrost.find_tail(LOG_ID, FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

// validating that segment 1 is still sealed and has its own tail at Lsn (6)
assert_that!(
segment_1.find_tail().await?,
segment_1.find_tail(FindTailAttr::default()).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

Expand All @@ -908,7 +933,7 @@ mod tests {

// segment 2 is open and at 8 as previously validated through bifrost interface
assert_that!(
segment_2.find_tail().await?,
segment_2.find_tail(FindTailAttr::default()).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

Expand Down
3 changes: 2 additions & 1 deletion crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use restate_types::Version;

use crate::bifrost::BifrostInner;
use crate::error::AdminError;
use crate::loglet::FindTailAttr;
use crate::loglet_wrapper::LogletWrapper;
use crate::{Error, Result};

Expand Down Expand Up @@ -187,7 +188,7 @@ impl<'a> BifrostAdmin<'a> {
}
}
}
let tail = loglet.find_tail().await?;
let tail = loglet.find_tail(FindTailAttr::default()).await?;

Ok(SealedSegment {
segment_index: loglet.segment_index(),
Expand Down
18 changes: 16 additions & 2 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ pub trait Loglet: Send + Sync {

/// The tail is *the first unwritten position* in the loglet.
///
/// Finds the durable tail of the loglet (last record offset that was durably committed) then
/// Finds the tail of the loglet (last record offset that was durably committed) then
/// it returns the offset **after** it. Virtually, the returned offset is the offset returned
/// after the next `append()` call.
///
/// If the loglet is empty, the loglet should return TailState::Open(Offset::OLDEST).
async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError>;
async fn find_tail(
&self,
attr: FindTailAttr,
) -> Result<TailState<LogletOffset>, OperationError>;

/// The offset of the slot **before** the first readable record (if it exists), or the offset
/// before the next slot that will be written to. Must not return Self::INVALID. If the loglet
Expand All @@ -150,6 +153,17 @@ pub trait Loglet: Send + Sync {
async fn seal(&self) -> Result<(), OperationError>;
}

#[derive(Default, Clone, Copy, PartialEq, Eq)]
pub enum FindTailAttr {
/// Finds the durable tail of the loglet.
#[default]
Durable,
/// Approximate find tail of the loglet. This should cheap to run based on the
/// loglet implementation. If an efficient approximation is not possible, it must
/// fall back to a durable check.
Approximate,
}

/// A stream of log records from a single loglet. Loglet streams are _always_ tailing streams.
pub trait LogletReadStream: Stream<Item = Result<LogEntry<LogletOffset>, OperationError>> {
/// Current read pointer. This points to the next offset to be read.
Expand Down
Loading
Loading