Skip to content

Commit

Permalink
namespace: add bottomless checks in exists()
Browse files Browse the repository at this point in the history
Even if the namespace doesn't exist locally, it might have a functional
backup.
  • Loading branch information
psarna committed Dec 15, 2023
1 parent fc5254f commit 4d913f9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
27 changes: 27 additions & 0 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,33 @@ impl Replicator {
})
}

/// Checks if there exists any backup of given database
pub async fn has_backup_of(db_name: impl AsRef<str>, options: &Options) -> Result<bool> {
let prefix = match &options.db_id {
Some(db_id) => format!("{db_id}-"),
None => format!("ns-:{}-", db_name.as_ref()),
};
let config = options.client_config().await?;
let client = Client::from_conf(config);
let bucket = options.bucket_name.clone();

match client.head_bucket().bucket(&bucket).send().await {
Ok(_) => tracing::trace!("Bucket {bucket} exists and is accessible"),
Err(e) => {
tracing::trace!("Bucket checking error: {e}");
return Err(e.into());
}
}

let mut last_frame = 0;
let list_objects = client.list_objects().bucket(&bucket).prefix(&prefix);
let response = list_objects.send().await?;
let _ = Self::try_get_last_frame_no(response, &mut last_frame);
tracing::trace!("Last frame of {prefix}: {last_frame}");

Ok(last_frame > 0)
}

pub async fn shutdown_gracefully(&mut self) -> Result<()> {
tracing::info!("bottomless replicator: shutting down...");
// 1. wait for all committed WAL frames to be committed locally
Expand Down
43 changes: 36 additions & 7 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ impl Default for NamespaceName {
}
}

impl AsRef<str> for NamespaceName {
fn as_ref(&self) -> &str {
self.as_str()
}
}

impl NamespaceName {
pub fn from_string(s: String) -> crate::Result<Self> {
Self::validate(&s)?;
Expand Down Expand Up @@ -170,7 +176,7 @@ pub trait MakeNamespace: Sync + Send + 'static {
meta_store: &MetaStore,
) -> crate::Result<Namespace<Self::Database>>;

fn exists(&self, namespace: &NamespaceName) -> bool;
async fn exists(&self, namespace: &NamespaceName) -> bool;
}

/// Creates new primary `Namespace`
Expand Down Expand Up @@ -287,9 +293,32 @@ impl MakeNamespace for PrimaryNamespaceMaker {
Ok(ns)
}

fn exists(&self, namespace: &NamespaceName) -> bool {
async fn exists(&self, namespace: &NamespaceName) -> bool {
let ns_path = self.config.base_path.join("dbs").join(namespace.as_str());
ns_path.try_exists().unwrap_or(false)
if let Ok(true) = ns_path.try_exists() {
return true;
}

if let Some(replication_options) = self.config.bottomless_replication.as_ref() {
tracing::info!("Bottomless: {:?}", replication_options);
match bottomless::replicator::Replicator::has_backup_of(namespace, replication_options)
.await
{
Ok(true) => {
tracing::debug!("Bottomless: Backup found");
return true;
}
Ok(false) => {
tracing::debug!("Bottomless: No backup found");
}
Err(err) => {
tracing::debug!("Bottomless: Error checking backup: {}", err);
}
}
} else {
tracing::debug!("Bottomless: No backup configured");
}
false
}
}

Expand Down Expand Up @@ -347,7 +376,7 @@ impl MakeNamespace for ReplicaNamespaceMaker {
return Err(ForkError::ForkReplica.into());
}

fn exists(&self, namespace: &NamespaceName) -> bool {
async fn exists(&self, namespace: &NamespaceName) -> bool {
let ns_path = self.config.base_path.join("dbs").join(namespace.as_str());
ns_path.try_exists().unwrap_or(false)
}
Expand Down Expand Up @@ -541,7 +570,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
}

// check that the source namespace exists
if !self.inner.make_namespace.exists(&from) {
if !self.inner.make_namespace.exists(&from).await {
return Err(crate::error::Error::NamespaceDoesntExist(from.to_string()));
}

Expand Down Expand Up @@ -608,7 +637,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
let namespace = namespace.clone();
async move {
if namespace != NamespaceName::default()
&& !self.inner.make_namespace.exists(&namespace)
&& !self.inner.make_namespace.exists(&namespace).await
&& !self.inner.allow_lazy_creation
{
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
Expand Down Expand Up @@ -678,7 +707,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
// otherwise it's an error.
if self.inner.allow_lazy_creation || namespace == NamespaceName::default() {
tracing::trace!("auto-creating the namespace");
} else if self.inner.make_namespace.exists(&namespace) {
} else if self.inner.make_namespace.exists(&namespace).await {
return Err(Error::NamespaceAlreadyExist(namespace.to_string()));
}

Expand Down

0 comments on commit 4d913f9

Please sign in to comment.