From 7c63c39f0325916c179f205e992c70ae8bcb4ccb Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 26 Aug 2024 20:59:40 +0200 Subject: [PATCH 01/52] estimate current durable frame_no from local source in Registry::open --- libsql-wal/src/registry.rs | 52 +++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index fb34224811..e93eb3e869 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -231,14 +231,28 @@ where namespace: &NamespaceName, db_path: &Path, ) -> Result>> { + let db_file = self.io.open(false, true, true, db_path)?; + let db_file_len = db_file.len()?; + let header = if db_file_len > 0 { + let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed(); + db_file.read_exact_at(header.as_bytes_mut(), 0)?; + Some(header) + } else { + None + }; + + let footer = self.try_read_footer(&db_file)?; + + let mut checkpointed_frame_no = footer.map(|f| f.replication_index.get()).unwrap_or(0); + let path = self.path.join(namespace.as_str()); self.io.create_dir_all(&path)?; // TODO: handle that with abstract io let dir = walkdir::WalkDir::new(&path).sort_by_file_name().into_iter(); - // TODO: pass config override here - let max_frame_no = self.storage.durable_frame_no_sync(&namespace, None); - let durable_frame_no = Arc::new(Mutex::new(max_frame_no)); + // we only checkpoint durable frame_no so this is a good first estimate without an actual + // network call. + let durable_frame_no = Arc::new(Mutex::new(checkpointed_frame_no)); let tail = SegmentList::default(); for entry in dir { @@ -271,18 +285,6 @@ where } } - let db_file = self.io.open(false, true, true, db_path)?; - let db_file_len = db_file.len()?; - let header = if db_file_len > 0 { - let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed(); - db_file.read_exact_at(header.as_bytes_mut(), 0)?; - Some(header) - } else { - None - }; - - let footer = self.try_read_footer(&db_file)?; - let log_id = match footer { Some(footer) if tail.is_empty() => footer.log_id(), None if tail.is_empty() => self.io.uuid(), @@ -316,23 +318,21 @@ where None => (0, NonZeroU64::new(1).unwrap()), }); - let current_path = path.join(format!("{namespace}:{next_frame_no:020}.seg")); + let current_segment_path = path.join(format!("{namespace}:{next_frame_no:020}.seg")); - let segment_file = self.io.open(true, true, true, ¤t_path)?; + let segment_file = self.io.open(true, true, true, ¤t_segment_path)?; let salt = self.io.with_rng(|rng| rng.gen()); - let checkpointed_frame_no = match tail.last() { - // if there is a tail, then the latest checkpointed frame_no is one before the the - // start frame_no of the tail. We must read it from the tail, because a partial - // checkpoint may have occured before a crash. - Some(last) => (last.start_frame_no() - 1).max(1), - // otherwise, we read the it from the footer. - None => footer.map(|f| f.replication_index.get()).unwrap_or(0), - }; + // if there is a tail, then the latest checkpointed frame_no is one before the the + // start frame_no of the tail. We must read it from the tail, because a partial + // checkpoint may have occured before a crash. + if let Some(last) = tail.last() { + checkpointed_frame_no = (last.start_frame_no() - 1).max(1) + } let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create( segment_file, - current_path, + current_segment_path, next_frame_no, db_size, tail.into(), From b4aa51467b80b74932d5e5d8c7ab82307c0a3829 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 26 Aug 2024 21:00:25 +0200 Subject: [PATCH 02/52] fix oracle test the durable frame no is not always u64::MAX anymore, so we need to wait to it to stabilize --- libsql-wal/src/lib.rs | 14 ++++++++++++++ libsql-wal/src/storage/mod.rs | 6 ++++++ libsql-wal/tests/oracle.rs | 3 ++- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/libsql-wal/src/lib.rs b/libsql-wal/src/lib.rs index 1dbcbb8cbd..ce93bc7f08 100644 --- a/libsql-wal/src/lib.rs +++ b/libsql-wal/src/lib.rs @@ -58,6 +58,7 @@ pub mod test { use std::path::Path; use std::path::PathBuf; use std::sync::Arc; + use std::time::Duration; use libsql_sys::name::NamespaceName; use libsql_sys::rusqlite::OpenFlags; @@ -172,4 +173,17 @@ pub mod test { } tx.end(); } + + pub async fn wait_current_durable(shared: &SharedWal) { + let current = shared.current.load().next_frame_no().get() - 1; + loop { + { + if *shared.durable_frame_no.lock() >= current { + break + } + } + + tokio::time::sleep(Duration::from_millis(5)).await; + } + } } diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index d20e8bb4a1..c042a8976d 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -473,6 +473,12 @@ impl Storage for TestStorage { .or_default() .insert(key, (out_path, index)); tokio::runtime::Handle::current().block_on(on_store(end_frame_no)); + } else { + // HACK: we need to spawn because many tests just call this method indirectly in + // async context. That makes tests easier to write. + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(on_store(u64::MAX)); + }); } } diff --git a/libsql-wal/tests/oracle.rs b/libsql-wal/tests/oracle.rs index 5752c307ef..357b835900 100644 --- a/libsql-wal/tests/oracle.rs +++ b/libsql-wal/tests/oracle.rs @@ -15,7 +15,7 @@ use libsql_sys::wal::{Sqlite3WalManager, Wal}; use libsql_sys::Connection; use libsql_wal::registry::WalRegistry; use libsql_wal::storage::TestStorage; -use libsql_wal::test::seal_current_segment; +use libsql_wal::test::{seal_current_segment, wait_current_durable}; use libsql_wal::wal::LibsqlWalManager; use once_cell::sync::Lazy; use rand::Rng; @@ -130,6 +130,7 @@ async fn run_test_sample(path: &Path) -> Result { let shared = registry.clone().open(&db_path, &"test".into()).unwrap(); seal_current_segment(&shared); + wait_current_durable(&shared).await; shared.checkpoint().await.unwrap(); std::env::set_current_dir(curdir).unwrap(); From eec17aa7f1c06fbc8123cfa8b6301637bd4be207 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 26 Aug 2024 21:01:40 +0200 Subject: [PATCH 03/52] return result from durable_frame_no and catch storage error --- libsql-wal/src/storage/async_storage.rs | 8 ++++---- libsql-wal/src/storage/mod.rs | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index d679fa1e2a..0ea1f35dc8 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -205,10 +205,10 @@ where &self, namespace: &NamespaceName, config_override: Option, - ) -> u64 { + ) -> super::Result { let config = config_override.unwrap_or_else(|| self.backend.default_config()); - let meta = self.backend.meta(&config, namespace).await.unwrap(); - meta.max_frame_no + let meta = self.backend.meta(&config, namespace).await?; + Ok(meta.max_frame_no) } async fn restore( @@ -230,7 +230,7 @@ where config_override: Option, ) -> u64 { tokio::runtime::Handle::current() - .block_on(self.durable_frame_no(namespace, config_override)) + .block_on(self.durable_frame_no(namespace, config_override)).unwrap() } async fn find_segment( diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index c042a8976d..9d38de0823 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -155,7 +155,7 @@ pub trait Storage: Send + Sync + 'static { &self, namespace: &NamespaceName, config_override: Option, - ) -> u64; + ) -> Result; async fn restore( &self, @@ -242,7 +242,7 @@ where &self, namespace: &NamespaceName, config_override: Option, - ) -> u64 { + ) -> Result { match zip(self, config_override) { Either::A((s, c)) => s.durable_frame_no(namespace, c).await, Either::B((s, c)) => s.durable_frame_no(namespace, c).await, @@ -341,8 +341,8 @@ impl Storage for NoStorage { &self, namespace: &NamespaceName, config: Option, - ) -> u64 { - self.durable_frame_no_sync(namespace, config) + ) -> Result { + Ok(self.durable_frame_no_sync(namespace, config)) } async fn restore( @@ -486,8 +486,8 @@ impl Storage for TestStorage { &self, namespace: &NamespaceName, config: Option, - ) -> u64 { - self.durable_frame_no_sync(namespace, config) + ) -> Result { + Ok(self.durable_frame_no_sync(namespace, config)) } async fn restore( From e183eaf8abcc7e0bb08f2631a475ab67c8f06052 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 26 Aug 2024 21:01:59 +0200 Subject: [PATCH 04/52] catch storage error --- libsql-wal/src/error.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libsql-wal/src/error.rs b/libsql-wal/src/error.rs index 9acda39766..f382ba6f16 100644 --- a/libsql-wal/src/error.rs +++ b/libsql-wal/src/error.rs @@ -24,6 +24,9 @@ pub enum Error { InvalidFooterMagic, #[error("invalid db footer version")] InvalidFooterVersion, + + #[error("storage error: {0}")] + Storage(#[from] crate::storage::Error), } impl Into for Error { From 46a7713f4a4f22875df6a27901c8ec3a21e7bfcf Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 26 Aug 2024 21:02:14 +0200 Subject: [PATCH 05/52] registry sync_all skeleton --- libsql-wal/src/bins/shell/main.rs | 2 +- libsql-wal/src/error.rs | 2 +- libsql-wal/src/registry.rs | 29 +++++++++++++ libsql-wal/src/storage/async_storage.rs | 9 ---- libsql-wal/src/storage/mod.rs | 57 ++++--------------------- 5 files changed, 39 insertions(+), 60 deletions(-) diff --git a/libsql-wal/src/bins/shell/main.rs b/libsql-wal/src/bins/shell/main.rs index 18522b7305..af8eec962b 100644 --- a/libsql-wal/src/bins/shell/main.rs +++ b/libsql-wal/src/bins/shell/main.rs @@ -143,7 +143,7 @@ where S: Storage, { let namespace = NamespaceName::from_string(namespace.to_owned()); - let durable = storage.durable_frame_no(&namespace, None).await; + let durable = storage.durable_frame_no(&namespace, None).await.unwrap(); println!("namespace: {namespace}"); println!("max durable frame: {durable}"); } diff --git a/libsql-wal/src/error.rs b/libsql-wal/src/error.rs index f382ba6f16..28081d9f51 100644 --- a/libsql-wal/src/error.rs +++ b/libsql-wal/src/error.rs @@ -26,7 +26,7 @@ pub enum Error { InvalidFooterVersion, #[error("storage error: {0}")] - Storage(#[from] crate::storage::Error), + Storage(#[from] Box), } impl Into for Error { diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index e93eb3e869..ae9ed0ce19 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -380,6 +380,18 @@ where } } + /// Attempts to sync all loaded dbs with durable storage + pub async fn sync_all(&self) -> Result<()> + where S: Storage, + { + for entry in self.opened.iter() { + let Slot::Wal(shared) = entry.value() else { panic!("all wals should already be opened") }; + sync_one(shared, self.storage.as_ref()).await?; + } + + Ok(()) + } + // On shutdown, we checkpoint all the WALs. This require sealing the current segment, and when // checkpointing all the segments pub async fn shutdown(self: Arc) -> Result<()> { @@ -439,6 +451,23 @@ where } } +#[tracing::instrument(skip_all, fields(namespace = shared.namespace.as_str()))] +async fn sync_one(shared: &SharedWal, storage: &S) -> Result<()> +where IO: Io, + S: Storage +{ + let remote_durable_frame_no = storage.durable_frame_no(&shared.namespace, None).await.map_err(Box::new)?; + let local_current_frame_no = shared.current.load().next_frame_no().get() - 1; + + if remote_durable_frame_no >= local_current_frame_no { + tracing::info!(remote_durable_frame_no, local_current_frame_no, "remote storage has newer segments"); + } else { + tracing::info!("local database is up to date"); + } + + Ok(()) +} + fn read_log_id_from_footer(db_file: &F, db_size: u64) -> io::Result { let mut footer: LibsqlFooter = LibsqlFooter::new_zeroed(); let footer_offset = LIBSQL_PAGE_SIZE as u64 * db_size; diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index 0ea1f35dc8..843ca6c920 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -224,15 +224,6 @@ where .await } - fn durable_frame_no_sync( - &self, - namespace: &NamespaceName, - config_override: Option, - ) -> u64 { - tokio::runtime::Handle::current() - .block_on(self.durable_frame_no(namespace, config_override)).unwrap() - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 9d38de0823..108d9de7d7 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -145,17 +145,11 @@ pub trait Storage: Send + Sync + 'static { on_store: OnStoreCallback, ); - fn durable_frame_no_sync( + fn durable_frame_no( &self, namespace: &NamespaceName, config_override: Option, - ) -> u64; - - async fn durable_frame_no( - &self, - namespace: &NamespaceName, - config_override: Option, - ) -> Result; + ) -> impl Future> + Send; async fn restore( &self, @@ -227,17 +221,6 @@ where } } - fn durable_frame_no_sync( - &self, - namespace: &NamespaceName, - config_override: Option, - ) -> u64 { - match zip(self, config_override) { - Either::A((s, c)) => s.durable_frame_no_sync(namespace, c), - Either::B((s, c)) => s.durable_frame_no_sync(namespace, c), - } - } - async fn durable_frame_no( &self, namespace: &NamespaceName, @@ -339,10 +322,10 @@ impl Storage for NoStorage { async fn durable_frame_no( &self, - namespace: &NamespaceName, - config: Option, + _namespace: &NamespaceName, + _config: Option, ) -> Result { - Ok(self.durable_frame_no_sync(namespace, config)) + Ok(u64::MAX) } async fn restore( @@ -355,14 +338,6 @@ impl Storage for NoStorage { panic!("can restore from no storage") } - fn durable_frame_no_sync( - &self, - _namespace: &NamespaceName, - _config_override: Option, - ) -> u64 { - u64::MAX - } - async fn find_segment( &self, _namespace: &NamespaceName, @@ -484,10 +459,10 @@ impl Storage for TestStorage { async fn durable_frame_no( &self, - namespace: &NamespaceName, - config: Option, + _namespace: &NamespaceName, + _config: Option, ) -> Result { - Ok(self.durable_frame_no_sync(namespace, config)) + Ok(u64::MAX) } async fn restore( @@ -500,22 +475,6 @@ impl Storage for TestStorage { todo!(); } - fn durable_frame_no_sync( - &self, - namespace: &NamespaceName, - _config_override: Option, - ) -> u64 { - let inner = self.inner.lock_blocking(); - if inner.store { - let Some(segs) = inner.stored.get(namespace) else { - return 0; - }; - segs.keys().map(|k| k.end_frame_no).max().unwrap_or(0) - } else { - u64::MAX - } - } - async fn find_segment( &self, namespace: &NamespaceName, From 5ebf0e4b8b64a9167a09a28d93913cb0e3a890e2 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 27 Aug 2024 08:40:19 +0200 Subject: [PATCH 06/52] injector creates txn --- libsql-server/src/bottomless_migrate.rs | 8 +------- .../src/namespace/configurator/libsql_replica.rs | 9 +-------- libsql-wal/src/registry.rs | 1 + libsql-wal/src/replication/injector.rs | 16 +++++++--------- 4 files changed, 10 insertions(+), 24 deletions(-) diff --git a/libsql-server/src/bottomless_migrate.rs b/libsql-server/src/bottomless_migrate.rs index 75df6f8e19..2b9067baff 100644 --- a/libsql-server/src/bottomless_migrate.rs +++ b/libsql-server/src/bottomless_migrate.rs @@ -162,13 +162,7 @@ async fn migrate_one( .await .unwrap()?; - let mut tx = shared.begin_read(0).into(); - shared.upgrade(&mut tx).unwrap(); - let guard = tx - .into_write() - .unwrap_or_else(|_| panic!("should be a write txn")) - .into_lock_owned(); - let mut injector = Injector::new(shared.clone(), guard, 10)?; + let mut injector = Injector::new(shared.clone(), 10)?; let orig_db_path = base_path .join("dbs") .join(config.namespace().as_str()) diff --git a/libsql-server/src/namespace/configurator/libsql_replica.rs b/libsql-server/src/namespace/configurator/libsql_replica.rs index 50ecc95610..1a67dfd1e1 100644 --- a/libsql-server/src/namespace/configurator/libsql_replica.rs +++ b/libsql-server/src/namespace/configurator/libsql_replica.rs @@ -11,7 +11,6 @@ use libsql_sys::name::NamespaceResolver; use libsql_wal::io::StdIO; use libsql_wal::registry::WalRegistry; use libsql_wal::replication::injector::Injector; -use libsql_wal::transaction::Transaction; use libsql_wal::wal::LibsqlWalManager; use tokio::task::JoinSet; use tonic::transport::Channel; @@ -151,13 +150,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator { .await .unwrap(); - let mut tx = Transaction::Read(shared.begin_read(u64::MAX)); - shared.upgrade(&mut tx).unwrap(); - let guard = tx - .into_write() - .unwrap_or_else(|_| panic!()) - .into_lock_owned(); - let injector = Injector::new(shared, guard, 10).unwrap(); + let injector = Injector::new(shared, 10).unwrap(); let injector = LibsqlInjector::new(injector); let mut replicator = Replicator::new(client, injector); diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index ae9ed0ce19..7a29ccb33c 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -384,6 +384,7 @@ where pub async fn sync_all(&self) -> Result<()> where S: Storage, { + tracing::info!("syncing {} namespaces", self.opened.len()); for entry in self.opened.iter() { let Slot::Wal(shared) = entry.value() else { panic!("all wals should already be opened") }; sync_one(shared, self.storage.as_ref()).await?; diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 3a152b412e..c4d47e5bf4 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -2,11 +2,13 @@ use std::sync::Arc; +use tokio_stream::{Stream, StreamExt}; + use crate::error::Result; use crate::io::Io; use crate::segment::Frame; use crate::shared_wal::SharedWal; -use crate::transaction::TxGuardOwned; +use crate::transaction::{Transaction, TxGuardOwned}; /// The injector takes frames and injects them in the wal. pub struct Injector { @@ -22,9 +24,11 @@ pub struct Injector { impl Injector { pub fn new( wal: Arc>, - tx: TxGuardOwned, buffer_capacity: usize, ) -> Result { + let mut tx = Transaction::Read(wal.begin_read(u64::MAX)); + wal.upgrade(&mut tx)?; + let tx = tx.into_write().unwrap_or_else(|_| unreachable!()).into_lock_owned(); Ok(Self { wal, buffer: Vec::with_capacity(buffer_capacity), @@ -93,13 +97,7 @@ mod test { let replica_conn = replica_env.open_conn("test"); let replica_shared = replica_env.shared("test"); - let mut tx = crate::transaction::Transaction::Read(replica_shared.begin_read(42)); - replica_shared.upgrade(&mut tx).unwrap(); - let guard = tx - .into_write() - .unwrap_or_else(|_| panic!()) - .into_lock_owned(); - let mut injector = Injector::new(replica_shared.clone(), guard, 10).unwrap(); + let mut injector = Injector::new(replica_shared.clone(), 10).unwrap(); primary_conn.execute("create table test (x)", ()).unwrap(); From 5fc361410e2c186e79b7cd41855ebe15a1924031 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 27 Aug 2024 08:40:43 +0200 Subject: [PATCH 07/52] Injector::inject_stream --- libsql-wal/src/replication/injector.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index c4d47e5bf4..8ae08d9d83 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -38,6 +38,19 @@ impl Injector { }) } + pub async fn inject_stream(&mut self, stream: impl Stream>>) -> Result<()> { + tokio::pin!(stream); + loop { + match stream.next().await { + Some(Ok(frame)) => { + self.insert_frame(frame).await?; + }, + Some(Err(e)) => return Err(e), + None => return Ok(()), + } + } + } + pub async fn insert_frame(&mut self, frame: Box) -> Result> { let size_after = frame.size_after(); self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no()); From c60b9e715ee91bc326c547f1d40b8065fdac2c81 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 27 Aug 2024 08:41:16 +0200 Subject: [PATCH 08/52] rename tail to list --- libsql-wal/src/registry.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 7a29ccb33c..839f81971f 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -254,7 +254,7 @@ where // network call. let durable_frame_no = Arc::new(Mutex::new(checkpointed_frame_no)); - let tail = SegmentList::default(); + let list = SegmentList::default(); for entry in dir { let entry = entry.map_err(|e| e.into_io_error().unwrap())?; if entry @@ -281,15 +281,15 @@ where }); // TODO: pass config override here self.storage.store(&namespace, sealed.clone(), None, cb); - tail.push(sealed); + list.push(sealed); } } let log_id = match footer { - Some(footer) if tail.is_empty() => footer.log_id(), - None if tail.is_empty() => self.io.uuid(), + Some(footer) if list.is_empty() => footer.log_id(), + None if list.is_empty() => self.io.uuid(), Some(footer) => { - let log_id = tail + let log_id = list .with_head(|h| h.header().log_id.get()) .expect("non-empty list should have a head"); let log_id = Uuid::from_u128(log_id); @@ -297,14 +297,14 @@ where log_id } None => { - let log_id = tail + let log_id = list .with_head(|h| h.header().log_id.get()) .expect("non-empty list should have a head"); Uuid::from_u128(log_id) } }; - let (db_size, next_frame_no) = tail + let (db_size, next_frame_no) = list .with_head(|segment| { let header = segment.header(); (header.size_after(), header.next_frame_no()) @@ -335,7 +335,7 @@ where current_segment_path, next_frame_no, db_size, - tail.into(), + list.into(), salt, log_id, )?)); From b59641f4dbfa729330b11c63d51c7956336c8cd8 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 27 Aug 2024 08:41:36 +0200 Subject: [PATCH 09/52] compute next_frame_no correctly from empty tail. --- libsql-wal/src/registry.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 839f81971f..de8997523a 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -304,32 +304,32 @@ where } }; + // if there is a tail, then the latest checkpointed frame_no is one before the the + // start frame_no of the tail. We must read it from the tail, because a partial + // checkpoint may have occured before a crash. + if let Some(last) = list.last() { + checkpointed_frame_no = (last.start_frame_no() - 1).max(1) + } + let (db_size, next_frame_no) = list .with_head(|segment| { - let header = segment.header(); + let header = dbg!(segment.header()); (header.size_after(), header.next_frame_no()) }) - .unwrap_or_else(|| match header { - Some(header) => ( - header.db_size.get(), - NonZeroU64::new(header.replication_index.get() + 1) - .unwrap_or(NonZeroU64::new(1).unwrap()), - ), - None => (0, NonZeroU64::new(1).unwrap()), - }); + .unwrap_or_else(|| match header { + Some(header) => ( + header.db_size.get(), + NonZeroU64::new(checkpointed_frame_no + 1) + .unwrap_or(NonZeroU64::new(1).unwrap()), + ), + None => (0, NonZeroU64::new(1).unwrap()), + }); let current_segment_path = path.join(format!("{namespace}:{next_frame_no:020}.seg")); let segment_file = self.io.open(true, true, true, ¤t_segment_path)?; let salt = self.io.with_rng(|rng| rng.gen()); - // if there is a tail, then the latest checkpointed frame_no is one before the the - // start frame_no of the tail. We must read it from the tail, because a partial - // checkpoint may have occured before a crash. - if let Some(last) = tail.last() { - checkpointed_frame_no = (last.start_frame_no() - 1).max(1) - } - let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create( segment_file, current_segment_path, From 9c5a2eb328c37bb0a287c2165c73c82fe3ff26e6 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 27 Aug 2024 16:15:45 +0200 Subject: [PATCH 10/52] fix copy_buf implementation --- libsql-wal/src/io/compat.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libsql-wal/src/io/compat.rs b/libsql-wal/src/io/compat.rs index 461cbfdc39..78ad7752d6 100644 --- a/libsql-wal/src/io/compat.rs +++ b/libsql-wal/src/io/compat.rs @@ -14,7 +14,7 @@ where R: AsyncRead + Unpin, { let mut dst_offset = 0u64; - let mut buffer = BytesMut::zeroed(4096); + let mut buffer = BytesMut::with_capacity(4096); loop { let n = src.read_buf(&mut buffer).await?; if n == 0 { @@ -22,7 +22,7 @@ where } let (b, ret) = dst.write_all_at_async(buffer, dst_offset).await; ret?; - dst_offset += b.len() as u64; + dst_offset += n as u64; buffer = b; buffer.clear(); } From fbb40da366ad9d38c5c4c0133c15ab94c0251d96 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 28 Aug 2024 11:12:55 +0200 Subject: [PATCH 11/52] fix missing SEALED header on recovery --- libsql-wal/src/segment/sealed.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index ddc71f4168..0fa88096e1 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -304,6 +304,8 @@ impl SealedSegment { header.index_size = index_size.into(); header.last_commited_frame_no = last_committed.into(); header.size_after = size_after.into(); + let flags = header.flags(); + header.set_flags(flags | SegmentFlags::SEALED); header.recompute_checksum(); file.write_all_at(header.as_bytes(), 0)?; let index = Map::new(index_bytes.into()).unwrap(); From a27062fb824994aad00d6a1855ed3805277a500e Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:38:00 +0200 Subject: [PATCH 12/52] sync config --- libsql-server/src/lib.rs | 25 ++++++++++++++++++++----- libsql-server/src/main.rs | 25 +++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index cd415eef33..4eb0d1b9c1 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -177,6 +177,9 @@ pub struct Server, pub migrate_bottomless: bool, pub enable_deadlock_monitor: bool, + pub should_sync_from_storage: bool, + pub force_load_wals: bool, + pub sync_conccurency: usize, } impl Default for Server { @@ -203,6 +206,9 @@ impl Default for Server { connector: None, migrate_bottomless: false, enable_deadlock_monitor: false, + should_sync_from_storage: false, + force_load_wals: false, + sync_conccurency: 8, } } } @@ -958,19 +964,28 @@ where Ok(()) }); - // If we have performed the migration, load all shared wals to force flush to storage with - // the new registry - if did_migrate { + // If we performed a migration from bottomless to libsql-wal earlier, then we need to + // forecefully load all the wals, to trigger segment storage with the actual storage. This + // is because migration didn't actually send anything to storage, but just created the + // segments. + if did_migrate || self.should_sync_from_storage || self.force_load_wals { + // eagerly load all namespaces, then call sync_all on the registry + // TODO: do conccurently let dbs_path = base_config.base_path.join("dbs"); let stream = meta_store.namespaces(); tokio::pin!(stream); while let Some(conf) = stream.next().await { let registry = registry.clone(); let namespace = conf.namespace().clone(); - let path = dbs_path.join(namespace.as_str()).join("data"); - tokio::task::spawn_blocking(move || registry.open(&path, &namespace.into())) + let path = dbs_path.join(namespace.as_str()); + tokio::fs::create_dir_all(&path).await?; + tokio::task::spawn_blocking(move || registry.open(&path.join("data"), &namespace.into())) .await .unwrap()?; + } + + if self.should_sync_from_storage { + registry.sync_all(self.sync_conccurency).await?; } } diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index eb24126d6e..5e91b4008f 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -280,6 +280,28 @@ struct Cli { /// Auth key for the admin API #[clap(long, env = "LIBSQL_ADMIN_AUTH_KEY", requires = "admin_listen_addr")] admin_auth_key: Option, + + /// Whether to perform a sync of all namespaces with remote on startup + #[clap( + long, + env = "LIBSQL_SYNC_FROM_STORAGE", + requires = "enable_bottomless_replication", + )] + sync_from_storage: bool, + /// Whether to force loading all WAL at startup, with libsql-wal + /// By default, WALs are loaded lazily, as the databases are openned. + #[clap( + long, + )] + force_load_wals: bool, + /// Sync conccurency + #[clap( + long, + env = "LIBSQL_SYNC_CONCCURENCY", + requires = "sync_from_storage", + default_value = "8", + )] + sync_conccurency: usize, } #[derive(clap::Subcommand, Debug)] @@ -681,6 +703,9 @@ async fn build_server(config: &Cli) -> anyhow::Result { connector: Some(https), migrate_bottomless: config.migrate_bottomless, enable_deadlock_monitor: config.enable_deadlock_monitor, + should_sync_from_storage: config.sync_from_storage, + force_load_wals: config.force_load_wals, + sync_conccurency: config.sync_conccurency, }) } From 8291d8fa066b522dd121ac93dc961dd060b8580b Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:38:11 +0200 Subject: [PATCH 13/52] shutdown store before regsitry (avoid serving more request while registry is shutting down) --- libsql-server/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 4eb0d1b9c1..a9c358f027 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -787,10 +787,10 @@ where tokio::select! { _ = shutdown.notified() => { let shutdown = async { + namespace_store.shutdown().await?; task_manager.shutdown().await?; // join_set.shutdown().await; service_shutdown.notify_waiters(); - namespace_store.shutdown().await?; Ok::<_, crate::Error>(()) }; From 62b1dbe91898d5f7b0255103de755a1fb2280643 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:38:51 +0200 Subject: [PATCH 14/52] clean migration checks --- libsql-server/src/lib.rs | 44 +++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index a9c358f027..7d8e002f5b 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -1251,31 +1251,33 @@ where base_config: &BaseNamespaceConfig, primary_config: &PrimaryConfig, ) -> anyhow::Result { - let is_previous_migration_successful = self.check_previous_migration_success()?; - let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal)); - let is_bottomless_enabled = self.db_config.bottomless_replication.is_some(); let is_primary = self.rpc_client_config.is_none(); - let should_attempt_migration = self.migrate_bottomless - && is_primary - && is_bottomless_enabled - && !is_previous_migration_successful - && is_libsql_wal; - - if should_attempt_migration { - bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?; - Ok(true) - } else { - // the wals directory is present and so is the _dbs. This means that a crash occured - // before we could remove it. clean it up now. see code in `bottomless_migrate.rs` - let tmp_dbs_path = base_config.base_path.join("_dbs"); - if tmp_dbs_path.try_exists()? { - tracing::info!("removed dangling `_dbs` folder"); - tokio::fs::remove_dir_all(&tmp_dbs_path).await?; + if self.migrate_bottomless && is_primary { + let is_previous_migration_successful = self.check_previous_migration_success()?; + let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal)); + let is_bottomless_enabled = self.db_config.bottomless_replication.is_some(); + let should_attempt_migration = is_bottomless_enabled + && !is_previous_migration_successful + && is_libsql_wal; + + if should_attempt_migration { + bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?; + return Ok(true); + } else { + // the wals directory is present and so is the _dbs. This means that a crash occured + // before we could remove it. clean it up now. see code in `bottomless_migrate.rs` + let tmp_dbs_path = base_config.base_path.join("_dbs"); + if tmp_dbs_path.try_exists()? { + tracing::info!("removed dangling `_dbs` folder"); + tokio::fs::remove_dir_all(&tmp_dbs_path).await?; + } + + tracing::info!("bottomless already migrated, skipping..."); } - tracing::info!("bottomless already migrated, skipping..."); - Ok(false) } + + Ok(false) } fn check_previous_migration_success(&self) -> anyhow::Result { From 26566fe2efeac8ef4213a7a648fcbf7e0aa497db Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:39:11 +0200 Subject: [PATCH 15/52] fix typo --- libsql-server/src/namespace/configurator/helpers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsql-server/src/namespace/configurator/helpers.rs b/libsql-server/src/namespace/configurator/helpers.rs index 538f9215fd..b259c4aadb 100644 --- a/libsql-server/src/namespace/configurator/helpers.rs +++ b/libsql-server/src/namespace/configurator/helpers.rs @@ -417,7 +417,7 @@ pub(crate) async fn run_storage_monitor( .await; } Err(e) => { - tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}"); + tracing::warn!("failed to open connection for storage monitor: {e}, trying again in {duration:?}"); } } From dcbfd1e36d81f74f410480ad5c324db5fdb0a3f9 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:39:44 +0200 Subject: [PATCH 16/52] catch checkpointer panic --- libsql-wal/src/checkpointer.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/libsql-wal/src/checkpointer.rs b/libsql-wal/src/checkpointer.rs index 049ceea6f3..acbcd5539e 100644 --- a/libsql-wal/src/checkpointer.rs +++ b/libsql-wal/src/checkpointer.rs @@ -144,16 +144,21 @@ where async fn step(&mut self) { tokio::select! { biased; - // fixme: we should probably handle a panic in the checkpointing task somehow - Some(Ok((namespace, result))) = self.join_set.join_next(), if !self.join_set.is_empty() => { - self.checkpointing.remove(&namespace); - if let Err(e) = result { - self.errors += 1; - tracing::error!("error checkpointing ns {namespace}: {e}, rescheduling"); - // reschedule - self.scheduled.insert(namespace); - } else { - self.errors = 0; + result = self.join_set.join_next(), if !self.join_set.is_empty() => { + match result { + Some(Ok((namespace, result))) => { + self.checkpointing.remove(&namespace); + if let Err(e) = result { + self.errors += 1; + tracing::error!("error checkpointing ns {namespace}: {e}, rescheduling"); + // reschedule + self.scheduled.insert(namespace); + } else { + self.errors = 0; + } + } + Some(Err(e)) => panic!("checkoint task panicked: {e}"), + None => unreachable!("got None, but join set is not empty") } } notified = self.recv.recv(), if !self.shutting_down => { From eee1cc9c46eb5906d434a2621293659446d73fdd Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:40:39 +0200 Subject: [PATCH 17/52] conccurent store_all --- libsql-wal/src/registry.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index de8997523a..f191b6a804 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -381,15 +381,37 @@ where } /// Attempts to sync all loaded dbs with durable storage - pub async fn sync_all(&self) -> Result<()> + pub async fn sync_all(&self, conccurency: usize) -> Result<()> where S: Storage, { + let mut join_set = JoinSet::new(); tracing::info!("syncing {} namespaces", self.opened.len()); + // FIXME: arbitrary value, maybe use something like numcpu * 2? + let before_sync = Instant::now(); + let sem = Arc::new(Semaphore::new(conccurency)); for entry in self.opened.iter() { let Slot::Wal(shared) = entry.value() else { panic!("all wals should already be opened") }; - sync_one(shared, self.storage.as_ref()).await?; + let storage = self.storage.clone(); + let shared = shared.clone(); + let sem = sem.clone(); + let permit = sem.acquire_owned().await.unwrap(); + + join_set.spawn(async move { + let _permit = permit; + sync_one(shared, storage).await + }); + + if let Some(ret) = join_set.try_join_next() { + ret.unwrap()?; + } } + while let Some(ret) = join_set.join_next().await { + ret.unwrap()?; + } + + tracing::info!("synced in {:?}", before_sync.elapsed()); + Ok(()) } From 9428d9febac2d83b1f423e9e4452e3ee4d0047c2 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:41:07 +0200 Subject: [PATCH 18/52] implement store_one --- libsql-wal/src/registry.rs | 49 +++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index f191b6a804..1fd7a7cd19 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -474,20 +474,57 @@ where } } -#[tracing::instrument(skip_all, fields(namespace = shared.namespace.as_str()))] -async fn sync_one(shared: &SharedWal, storage: &S) -> Result<()> +#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))] +async fn sync_one(shared: Arc>, storage: Arc) -> Result<()> where IO: Io, S: Storage { - let remote_durable_frame_no = storage.durable_frame_no(&shared.namespace, None).await.map_err(Box::new)?; + let remote_durable_frame_no = storage.durable_frame_no(shared.namespace(), None).await.map_err(Box::new)?; let local_current_frame_no = shared.current.load().next_frame_no().get() - 1; - if remote_durable_frame_no >= local_current_frame_no { + if remote_durable_frame_no > local_current_frame_no { tracing::info!(remote_durable_frame_no, local_current_frame_no, "remote storage has newer segments"); - } else { - tracing::info!("local database is up to date"); + let mut seen = RoaringBitmap::new(); + let replicator = StorageReplicator::new(storage, shared.namespace().clone()); + let stream = replicator + .stream(&mut seen, local_current_frame_no, 1) + .peekable(); + let mut injector = Injector::new(shared.clone(), 10)?; + // use pin to the heap so that we can drop the stream in the loop, and count `seen`. + let mut stream = Box::pin(stream); + loop { + match stream.next().await { + Some(Ok(mut frame)) => { + if stream.peek().await.is_none() { + drop(stream); + frame.header_mut().frame_no(); + frame.header_mut().set_size_after(seen.len() as _); + injector.insert_frame(frame).await?; + break + } else { + injector.insert_frame(frame).await?; + } + } + Some(Err(e)) => todo!("handle error: {e}, {}", shared.namespace()), + None => break, + } + } + + let mut tx = Transaction::Write(injector.into_guard().into_inner()); + let ret = { + let mut guard = tx.as_write_mut().unwrap().lock(); + guard.commit(); + // the current segment it unordered, no new frames should be appended to it, seal it and + // open a new segment + shared.swap_current(&guard) + }; + // make sure the tx is always ended before it's dropped! + tx.end(); + ret?; } + tracing::info!("local database is up to date"); + Ok(()) } From 8a08f8d29392ab68978fe2eff2cb1ab31ba37446 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:42:28 +0200 Subject: [PATCH 19/52] conditionally store segment when restoring (or in replica mode), we don't want to be storing the segments again. Before sending the segment to storage, we check that it's storable. For a sealed segment, that means that it's not unordered. --- libsql-wal/src/registry.rs | 71 +++++++++++++++++++++----------- libsql-wal/src/segment/mod.rs | 5 +++ libsql-wal/src/segment/sealed.rs | 37 ++++++++++++++--- libsql-wal/src/storage/job.rs | 4 ++ 4 files changed, 87 insertions(+), 30 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 1fd7a7cd19..f78b448ae9 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -3,13 +3,16 @@ use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Instant; use dashmap::DashMap; use libsql_sys::ffi::Sqlite3DbHeader; use parking_lot::{Condvar, Mutex}; use rand::Rng; +use roaring::RoaringBitmap; use tokio::sync::{mpsc, Notify, Semaphore}; use tokio::task::JoinSet; +use tokio_stream::StreamExt; use uuid::Uuid; use zerocopy::{AsBytes, FromZeroes}; @@ -17,13 +20,14 @@ use crate::checkpointer::CheckpointMessage; use crate::error::Result; use crate::io::file::FileExt; use crate::io::{Io, StdIO}; -use crate::replication::storage::StorageReplicator; +use crate::replication::injector::Injector; +use crate::replication::storage::{ReplicateFromStorage as _, StorageReplicator}; use crate::segment::list::SegmentList; use crate::segment::Segment; use crate::segment::{current::CurrentSegment, sealed::SealedSegment}; use crate::shared_wal::{SharedWal, SwapLog}; use crate::storage::{OnStoreCallback, Storage}; -use crate::transaction::TxGuard; +use crate::transaction::{Transaction, TxGuard}; use crate::{LibsqlFooter, LIBSQL_PAGE_SIZE}; use libsql_sys::name::NamespaceName; @@ -124,17 +128,14 @@ where // sealing must the last fallible operation, because we don't want to end up in a situation // where the current log is sealed and it wasn't swapped. if let Some(sealed) = current.seal()? { - // todo: pass config override here - let notifier = self.checkpoint_notifier.clone(); - let namespace = shared.namespace().clone(); - let durable_frame_no = shared.durable_frame_no.clone(); - let cb: OnStoreCallback = Box::new(move |fno| { - Box::pin(async move { - update_durable(fno, notifier, durable_frame_no, namespace).await; - }) - }); new.tail().push(sealed.clone()); - self.storage.store(&shared.namespace, sealed, None, cb); + maybe_store_segment( + self.storage.as_ref(), + &self.checkpoint_notifier, + &shared.namespace, + &shared.durable_frame_no, + sealed, + ); } shared.current.swap(Arc::new(new)); @@ -144,6 +145,31 @@ where } } +#[tracing::instrument(skip_all, fields(namespace = namespace.as_str(), start_frame_no = seg.start_frame_no()))] +fn maybe_store_segment( + storage: &S, + notifier: &tokio::sync::mpsc::Sender, + namespace: &NamespaceName, + durable_frame_no: &Arc>, + seg: S::Segment +) { + if seg.is_storable() { + let cb: OnStoreCallback = Box::new({ + let notifier = notifier.clone(); + let durable_frame_no = durable_frame_no.clone(); + let namespace = namespace.clone(); + move |fno| { + Box::pin(async move { + update_durable(fno, notifier, durable_frame_no, namespace).await; + }) + } + }); + storage.store(namespace, seg, None, cb); + } else { + tracing::debug!("segment marked as not storable; skipping"); + } +} + async fn update_durable( new_durable: u64, notifier: mpsc::Sender, @@ -271,17 +297,14 @@ where if let Some(sealed) = SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())? { - let notifier = self.checkpoint_notifier.clone(); - let ns = namespace.clone(); - let durable_frame_no = durable_frame_no.clone(); - let cb: OnStoreCallback = Box::new(move |fno| { - Box::pin(async move { - update_durable(fno, notifier, durable_frame_no, ns).await; - }) - }); - // TODO: pass config override here - self.storage.store(&namespace, sealed.clone(), None, cb); - list.push(sealed); + list.push(sealed.clone()); + maybe_store_segment( + self.storage.as_ref(), + &self.checkpoint_notifier, + &namespace, + &durable_frame_no, + sealed, + ); } } @@ -313,7 +336,7 @@ where let (db_size, next_frame_no) = list .with_head(|segment| { - let header = dbg!(segment.header()); + let header = segment.header(); (header.size_after(), header.next_frame_no()) }) .unwrap_or_else(|| match header { diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index 2882efaaba..ed074125da 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -160,6 +160,7 @@ pub trait Segment: Send + Sync + 'static { fn start_frame_no(&self) -> u64; fn last_committed(&self) -> u64; fn index(&self) -> &fst::Map>; + fn is_storable(&self) -> bool; fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result; /// returns the number of readers currently holding a reference to this log. /// The read count must monotonically decrease. @@ -216,6 +217,10 @@ impl Segment for Arc { fn destroy(&self, io: &IO) -> impl Future { self.as_ref().destroy(io) } + + fn is_storable(&self) -> bool { + self.as_ref().is_storable() + } } #[repr(C)] diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index 0fa88096e1..0e8f106ef0 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -108,8 +108,12 @@ where let mut current_offset = 0; while let Some((page_no_bytes, offset)) = pages.next() { - let (b, ret) = self.read_frame_offset_async(offset as _, buffer).await; - ret.unwrap(); + let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await; + ret?; + // transaction boundaries in a segment are completely erased. The responsibility in on + // the user of the segment to place the transaction boundary such that all frames from + // the segment are applied within the same transaction. + b.get_mut().header_mut().set_size_after(0); hasher.update(&b.get_ref().as_bytes()); let dest_offset = size_of::() + current_offset * size_of::(); @@ -191,6 +195,14 @@ where } } } + + fn is_storable(&self) -> bool { + // we don't store unordered segments, since they only happen in two cases: + // - in a replica: no need for storage + // - in a primary, on recovery from storage: we don't want to override remote + // segment. + !self.header().flags().contains(SegmentFlags::FRAME_UNORDERED) + } } impl SealedSegment { @@ -211,7 +223,7 @@ impl SealedSegment { // This happens in case of crash: the segment is not empty, but it wasn't sealed. We need to // recover the index, and seal the segment. if !header.flags().contains(SegmentFlags::SEALED) { - assert_eq!(header.index_offset.get(), 0); + assert_eq!(header.index_offset.get(), 0, "{header:?}"); return Self::recover(file, path, header).map(Some); } @@ -235,8 +247,6 @@ impl SealedSegment { assert_eq!(header.index_size.get(), 0); assert_eq!(header.index_offset.get(), 0); assert!(!header.flags().contains(SegmentFlags::SEALED)); - // recovery for replica log should take a different path (i.e: resync with primary) - assert!(!header.flags().contains(SegmentFlags::FRAME_UNORDERED)); let mut current_checksum = header.salt.get(); tracing::trace!("recovering unsealed segment at {path:?}"); @@ -246,6 +256,11 @@ impl SealedSegment { let mut last_committed = 0; let mut size_after = 0; let mut frame_count = 0; + // When the segment is ordered, then the biggest frame_no is the last commited + // frame. This is not the case for an unordered segment (in case of recovery or + // a replica), so we track the biggest frame_no and set last_commited to that + // value on a commit frame + let mut max_seen_frame_no = 0; for i in 0.. { let offset = checked_frame_offset(i as u32); match file.read_exact_at(frame.as_bytes_mut(), offset) { @@ -263,9 +278,19 @@ impl SealedSegment { current_checksum = new_checksum; frame_count += 1; + // this must always hold for a ordered segment. + #[cfg(debug_assertions)] + { + if !header.flags().contains(SegmentFlags::FRAME_UNORDERED) { + assert!(frame.frame.header().frame_no() > max_seen_frame_no); + } + } + + max_seen_frame_no = max_seen_frame_no.max(frame.frame.header.frame_no()); + current_tx.push(frame.frame.header().page_no()); if frame.frame.header.is_commit() { - last_committed = frame.frame.header().frame_no(); + last_committed = max_seen_frame_no; size_after = frame.frame.header().size_after(); let base_offset = (i + 1) - current_tx.len(); for (frame_offset, page_no) in current_tx.drain(..).enumerate() { diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index 07f5aa0cd6..797170644c 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -423,6 +423,10 @@ mod test { fn destroy(&self, _io: &IO) -> impl std::future::Future { async move { todo!() } } + + fn is_storable(&self) -> bool { + true + } } struct TestBackend; From 7f4c7bfb9258d69986faaa18c5b8a96ec00bc372 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:43:53 +0200 Subject: [PATCH 20/52] always end txn before dropping --- libsql-wal/src/shared_wal.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 74b609cdee..e94883fd01 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -284,14 +284,19 @@ impl SharedWal { pub fn seal_current(&self) -> Result<()> { let mut tx = self.begin_read(u64::MAX).into(); self.upgrade(&mut tx)?; - { + + let ret = { let mut guard = tx.as_write_mut().unwrap().lock(); guard.commit(); - self.swap_current(&mut guard)?; - } + self.swap_current(&mut guard) + }; + // make sure the tx is always ended before it's dropped! + // FIXME: this is an issue with this design, since downgrade consume self, we can't have a + // drop implementation. The should probably have a Option, to that we can + // take &mut Self instead. tx.end(); - Ok(()) + ret } /// Swap the current log. A write lock must be held, but the transaction must be must be committed already. From 85530d4eaa0fbae043fcb3337d9b5d605cd60fe1 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:45:04 +0200 Subject: [PATCH 21/52] remove inject_stream from injector --- libsql-wal/src/replication/injector.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 8ae08d9d83..76a61228d5 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -2,8 +2,6 @@ use std::sync::Arc; -use tokio_stream::{Stream, StreamExt}; - use crate::error::Result; use crate::io::Io; use crate::segment::Frame; @@ -38,19 +36,6 @@ impl Injector { }) } - pub async fn inject_stream(&mut self, stream: impl Stream>>) -> Result<()> { - tokio::pin!(stream); - loop { - match stream.next().await { - Some(Ok(frame)) => { - self.insert_frame(frame).await?; - }, - Some(Err(e)) => return Err(e), - None => return Ok(()), - } - } - } - pub async fn insert_frame(&mut self, frame: Box) -> Result> { let size_after = frame.size_after(); self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no()); From 8d5d1416b9f989e3bda5fe3ef8ac644ac10f2c0e Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:45:24 +0200 Subject: [PATCH 22/52] add Injector::into_guard --- libsql-wal/src/replication/injector.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 76a61228d5..500f7fa8d0 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -68,6 +68,10 @@ impl Injector { self.buffer.clear(); self.tx.reset(0); } + + pub(crate) fn into_guard(self) -> TxGuardOwned { + self.tx + } } #[cfg(test)] From 64c9cf9cfb71c4ea54802c85ca10a2a3a3b28eca Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:45:51 +0200 Subject: [PATCH 23/52] add assertion ensure well formedness of the segments --- libsql-wal/src/replication/storage.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index 35ea89fb09..ae3c13ac97 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -63,6 +63,7 @@ where let (frame, ret) = segment.read_frame(Frame::new_box_zeroed(), offset as u32).await; ret?; + debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0"); if frame.header().frame_no() >= until { yield frame; } From b3ada26d8b5f6a3383c19b6349cef2ea416ef084 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:46:55 +0200 Subject: [PATCH 24/52] add frame_count to segment header we could infer the number of frames for normal segments, but not for unordered segments. Count frames instead. --- libsql-wal/src/segment/current.rs | 7 ++++--- libsql-wal/src/segment/mod.rs | 11 +++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 7b4147fc68..d0bed5f716 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -73,6 +73,7 @@ impl CurrentSegment { salt: salt.into(), page_size: LIBSQL_PAGE_SIZE.into(), log_id: log_id.as_u128().into(), + frame_count: 0.into(), }; header.recompute_checksum(); @@ -96,7 +97,7 @@ impl CurrentSegment { } pub fn is_empty(&self) -> bool { - self.count_committed() == 0 + self.header.lock().is_empty() } pub fn with_header(&self, f: impl FnOnce(&SegmentHeader) -> R) -> R { @@ -113,7 +114,7 @@ impl CurrentSegment { } pub fn count_committed(&self) -> usize { - self.header.lock().count_committed() + self.header.lock().frame_count() } pub fn db_size(&self) -> u32 { @@ -398,7 +399,7 @@ impl CurrentSegment { F: FileExt, { let mut header = self.header.lock(); - let index_offset = header.count_committed() as u32; + let index_offset = header.frame_count() as u32; let index_byte_offset = checked_frame_offset(index_offset); let mut cursor = self.file.cursor(index_byte_offset); let writer = BufWriter::new(&mut cursor); diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index ed074125da..4828c52695 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -50,6 +50,8 @@ pub struct SegmentHeader { pub version: U16, pub start_frame_no: U64, pub last_commited_frame_no: U64, + /// number of frames in the segment + pub frame_count: U64, /// size of the database in pages, after applying the segment. pub size_after: U32, /// byte offset of the index. If 0, then the index wasn't written, and must be recovered. @@ -120,14 +122,11 @@ impl SegmentHeader { } fn is_empty(&self) -> bool { - self.last_commited_frame_no.get() == 0 + self.frame_count() == 0 } - fn count_committed(&self) -> usize { - self.last_commited_frame_no - .get() - .checked_sub(self.start_frame_no.get() - 1) - .unwrap_or(0) as usize + pub fn frame_count(&self) -> usize { + self.frame_count.get() as usize } pub fn last_committed(&self) -> u64 { From ebdc29ac557eebce2d4d876c6c31622d3bbb9264 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:47:40 +0200 Subject: [PATCH 25/52] add getters for CompactedSegment --- libsql-wal/src/segment/compacted.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index ae0bb02a62..689964f2b4 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -39,6 +39,10 @@ impl CompactedSegmentDataHeader { Ok(()) } + + pub fn size_after(&self) -> u32 { + self.size_after.get() + } } #[derive(Debug, AsBytes, FromZeroes, FromBytes)] @@ -62,6 +66,10 @@ impl CompactedSegment { file: f(self.file), } } + + pub fn header(&self) -> &CompactedSegmentDataHeader { + &self.header + } } impl CompactedSegment { From 4bf5f758ac5559646fceefd8dc10537f4176c537 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:47:58 +0200 Subject: [PATCH 26/52] update frame count on commit --- libsql-wal/src/segment/current.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index d0bed5f716..ca38153fdb 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -193,6 +193,10 @@ impl CurrentSegment { // set frames unordered because there are no guarantees that we received frames // in order. header.set_flags(header.flags().union(SegmentFlags::FRAME_UNORDERED)); + { + let savepoint = tx.savepoints.first().unwrap(); + header.frame_count = (header.frame_count.get() + (tx.next_offset - savepoint.next_offset) as u64).into(); + } header.recompute_checksum(); let (header, ret) = self @@ -299,6 +303,7 @@ impl CurrentSegment { } } + // commit if let Some(size_after) = size_after { if tx.not_empty() { let new_checksum = if let Some(offset) = tx.recompute_checksum { @@ -326,6 +331,11 @@ impl CurrentSegment { let mut header = { *self.header.lock() }; header.last_commited_frame_no = last_frame_no.into(); header.size_after = size_after.into(); + // count how many frames were appeneded: basically last appeneded offset - initial + // offset + let tx = tx.deref_mut(); + let savepoint = tx.savepoints.first().unwrap(); + header.frame_count = (header.frame_count.get() + (tx.next_offset - savepoint.next_offset) as u64).into(); header.recompute_checksum(); self.file.write_all_at(header.as_bytes(), 0)?; From 22bbc48599ee422725c0cdf0ca61cd1245b054f5 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:48:34 +0200 Subject: [PATCH 27/52] get txn out of owned txn guard --- libsql-wal/src/transaction.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index 2fa593bade..e9b9159656 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -155,14 +155,23 @@ pub struct WriteTransaction { } pub struct TxGuardOwned { - _lock: Option>>, + lock: Option>>, inner: Option>, } +impl TxGuardOwned { + pub(crate) fn into_inner(mut self) -> WriteTransaction { + self.lock.take(); + self.inner.take().unwrap() + } +} + impl Drop for TxGuardOwned { fn drop(&mut self) { - let _ = self._lock.take(); - self.inner.take().expect("already dropped").downgrade(); + let _ = self.lock.take(); + if let Some(inner) = self.inner.take() { + inner.downgrade(); + } } } @@ -217,11 +226,6 @@ impl WriteTransaction { } pub fn lock(&mut self) -> TxGuard { - if self.is_commited { - tracing::error!("transaction already commited"); - todo!("txn has already been commited"); - } - let g = self.wal_lock.tx_id.lock_arc_blocking(); match *g { // we still hold the lock, we can proceed @@ -236,16 +240,11 @@ impl WriteTransaction { } pub fn into_lock_owned(self) -> TxGuardOwned { - if self.is_commited { - tracing::error!("transaction already commited"); - todo!("txn has already been commited"); - } - let g = self.wal_lock.tx_id.lock_arc_blocking(); match *g { // we still hold the lock, we can proceed Some(id) if self.id == id => TxGuardOwned { - _lock: Some(g), + lock: Some(g), inner: Some(self), }, // Somebody took the lock from us From 0a5754cdb93be2aa443f3d8268e41811ce9faad6 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:49:07 +0200 Subject: [PATCH 28/52] add ZeroCopyBuf::get_mut --- libsql-wal/src/io/buf.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libsql-wal/src/io/buf.rs b/libsql-wal/src/io/buf.rs index f28718c8e8..ba51018d74 100644 --- a/libsql-wal/src/io/buf.rs +++ b/libsql-wal/src/io/buf.rs @@ -201,6 +201,11 @@ impl ZeroCopyBuf { unsafe { self.inner.assume_init_ref() } } + pub fn get_mut(&mut self) -> &mut T { + assert!(self.is_init()); + unsafe { self.inner.assume_init_mut() } + } + pub fn into_inner(self) -> T { assert!(self.is_init()); unsafe { self.inner.assume_init() } From b57a3e9ff03bd4910925cebe04809b030b4c055c Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:49:31 +0200 Subject: [PATCH 29/52] make SegmentHeader::flags public --- libsql-wal/src/segment/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index 4828c52695..a3f6d56441 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -99,7 +99,7 @@ impl SegmentHeader { } } - fn flags(&self) -> SegmentFlags { + pub fn flags(&self) -> SegmentFlags { SegmentFlags::from_bits(self.flags.get()).unwrap() } From e9c292deaf57c77d7c930541d4c73625238d50f9 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:50:11 +0200 Subject: [PATCH 30/52] validate segment key from s3 if the key didn't exist, s3 would return the next key for a different namespace. Check that the returned key matches the requested namespace. --- libsql-wal/src/storage/mod.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 108d9de7d7..41caef739f 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::fmt; use std::future::Future; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; @@ -85,6 +85,28 @@ impl SegmentKey { pub(crate) fn includes(&self, frame_no: u64) -> bool { (self.start_frame_no..=self.end_frame_no).contains(&frame_no) } + + #[tracing::instrument] + fn validate_from_path(mut path: &Path, ns: &NamespaceName) -> Option { + // path in the form "v2/clusters/{cluster-id}/namespaces/{namespace}/indexes/{index-key}" + let key: Self = path.file_name()?.to_str()?.parse().ok()?; + + path = path.parent()?; + + if path.file_name()? != "indexes" { + tracing::debug!("invalid key, ignoring"); + return None; + } + + path = path.parent()?; + + if path.file_name()? != ns.as_str() { + tracing::debug!("invalid namespace for key"); + return None; + } + + Some(key) + } } impl From<&SegmentMeta> for SegmentKey { From ce633639985021026ba439e03e4ef3fac18e43b0 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 15:55:59 +0200 Subject: [PATCH 31/52] fmt --- libsql-server/src/lib.rs | 16 +++++----- libsql-server/src/main.rs | 7 ++--- libsql-wal/src/lib.rs | 2 +- libsql-wal/src/registry.rs | 43 ++++++++++++++++---------- libsql-wal/src/replication/injector.rs | 10 +++--- libsql-wal/src/segment/current.rs | 8 +++-- libsql-wal/src/segment/sealed.rs | 5 ++- 7 files changed, 54 insertions(+), 37 deletions(-) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 7d8e002f5b..3263db560b 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -979,10 +979,12 @@ where let namespace = conf.namespace().clone(); let path = dbs_path.join(namespace.as_str()); tokio::fs::create_dir_all(&path).await?; - tokio::task::spawn_blocking(move || registry.open(&path.join("data"), &namespace.into())) - .await - .unwrap()?; - } + tokio::task::spawn_blocking(move || { + registry.open(&path.join("data"), &namespace.into()) + }) + .await + .unwrap()?; + } if self.should_sync_from_storage { registry.sync_all(self.sync_conccurency).await?; @@ -1256,9 +1258,8 @@ where let is_previous_migration_successful = self.check_previous_migration_success()?; let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal)); let is_bottomless_enabled = self.db_config.bottomless_replication.is_some(); - let should_attempt_migration = is_bottomless_enabled - && !is_previous_migration_successful - && is_libsql_wal; + let should_attempt_migration = + is_bottomless_enabled && !is_previous_migration_successful && is_libsql_wal; if should_attempt_migration { bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?; @@ -1274,7 +1275,6 @@ where tracing::info!("bottomless already migrated, skipping..."); } - } Ok(false) diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index 5e91b4008f..5fd69cd133 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -285,14 +285,13 @@ struct Cli { #[clap( long, env = "LIBSQL_SYNC_FROM_STORAGE", - requires = "enable_bottomless_replication", + requires = "enable_bottomless_replication" )] sync_from_storage: bool, /// Whether to force loading all WAL at startup, with libsql-wal /// By default, WALs are loaded lazily, as the databases are openned. - #[clap( - long, - )] + /// Whether to force loading all wal at startup + #[clap(long)] force_load_wals: bool, /// Sync conccurency #[clap( diff --git a/libsql-wal/src/lib.rs b/libsql-wal/src/lib.rs index ce93bc7f08..475538b57d 100644 --- a/libsql-wal/src/lib.rs +++ b/libsql-wal/src/lib.rs @@ -179,7 +179,7 @@ pub mod test { loop { { if *shared.durable_frame_no.lock() >= current { - break + break; } } diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index f78b448ae9..5ae5c89ee2 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -151,7 +151,7 @@ fn maybe_store_segment( notifier: &tokio::sync::mpsc::Sender, namespace: &NamespaceName, durable_frame_no: &Arc>, - seg: S::Segment + seg: S::Segment, ) { if seg.is_storable() { let cb: OnStoreCallback = Box::new({ @@ -339,14 +339,14 @@ where let header = segment.header(); (header.size_after(), header.next_frame_no()) }) - .unwrap_or_else(|| match header { - Some(header) => ( - header.db_size.get(), - NonZeroU64::new(checkpointed_frame_no + 1) - .unwrap_or(NonZeroU64::new(1).unwrap()), - ), - None => (0, NonZeroU64::new(1).unwrap()), - }); + .unwrap_or_else(|| match header { + Some(header) => ( + header.db_size.get(), + NonZeroU64::new(checkpointed_frame_no + 1) + .unwrap_or(NonZeroU64::new(1).unwrap()), + ), + None => (0, NonZeroU64::new(1).unwrap()), + }); let current_segment_path = path.join(format!("{namespace}:{next_frame_no:020}.seg")); @@ -405,7 +405,8 @@ where /// Attempts to sync all loaded dbs with durable storage pub async fn sync_all(&self, conccurency: usize) -> Result<()> - where S: Storage, + where + S: Storage, { let mut join_set = JoinSet::new(); tracing::info!("syncing {} namespaces", self.opened.len()); @@ -413,7 +414,9 @@ where let before_sync = Instant::now(); let sem = Arc::new(Semaphore::new(conccurency)); for entry in self.opened.iter() { - let Slot::Wal(shared) = entry.value() else { panic!("all wals should already be opened") }; + let Slot::Wal(shared) = entry.value() else { + panic!("all wals should already be opened") + }; let storage = self.storage.clone(); let shared = shared.clone(); let sem = sem.clone(); @@ -499,14 +502,22 @@ where #[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))] async fn sync_one(shared: Arc>, storage: Arc) -> Result<()> -where IO: Io, - S: Storage +where + IO: Io, + S: Storage, { - let remote_durable_frame_no = storage.durable_frame_no(shared.namespace(), None).await.map_err(Box::new)?; + let remote_durable_frame_no = storage + .durable_frame_no(shared.namespace(), None) + .await + .map_err(Box::new)?; let local_current_frame_no = shared.current.load().next_frame_no().get() - 1; if remote_durable_frame_no > local_current_frame_no { - tracing::info!(remote_durable_frame_no, local_current_frame_no, "remote storage has newer segments"); + tracing::info!( + remote_durable_frame_no, + local_current_frame_no, + "remote storage has newer segments" + ); let mut seen = RoaringBitmap::new(); let replicator = StorageReplicator::new(storage, shared.namespace().clone()); let stream = replicator @@ -523,7 +534,7 @@ where IO: Io, frame.header_mut().frame_no(); frame.header_mut().set_size_after(seen.len() as _); injector.insert_frame(frame).await?; - break + break; } else { injector.insert_frame(frame).await?; } diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 500f7fa8d0..b33e8d1ed9 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -20,13 +20,13 @@ pub struct Injector { } impl Injector { - pub fn new( - wal: Arc>, - buffer_capacity: usize, - ) -> Result { + pub fn new(wal: Arc>, buffer_capacity: usize) -> Result { let mut tx = Transaction::Read(wal.begin_read(u64::MAX)); wal.upgrade(&mut tx)?; - let tx = tx.into_write().unwrap_or_else(|_| unreachable!()).into_lock_owned(); + let tx = tx + .into_write() + .unwrap_or_else(|_| unreachable!()) + .into_lock_owned(); Ok(Self { wal, buffer: Vec::with_capacity(buffer_capacity), diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index ca38153fdb..594e6b42c4 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -195,7 +195,9 @@ impl CurrentSegment { header.set_flags(header.flags().union(SegmentFlags::FRAME_UNORDERED)); { let savepoint = tx.savepoints.first().unwrap(); - header.frame_count = (header.frame_count.get() + (tx.next_offset - savepoint.next_offset) as u64).into(); + header.frame_count = (header.frame_count.get() + + (tx.next_offset - savepoint.next_offset) as u64) + .into(); } header.recompute_checksum(); @@ -335,7 +337,9 @@ impl CurrentSegment { // offset let tx = tx.deref_mut(); let savepoint = tx.savepoints.first().unwrap(); - header.frame_count = (header.frame_count.get() + (tx.next_offset - savepoint.next_offset) as u64).into(); + header.frame_count = (header.frame_count.get() + + (tx.next_offset - savepoint.next_offset) as u64) + .into(); header.recompute_checksum(); self.file.write_all_at(header.as_bytes(), 0)?; diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index 0e8f106ef0..1261f0029e 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -201,7 +201,10 @@ where // - in a replica: no need for storage // - in a primary, on recovery from storage: we don't want to override remote // segment. - !self.header().flags().contains(SegmentFlags::FRAME_UNORDERED) + !self + .header() + .flags() + .contains(SegmentFlags::FRAME_UNORDERED) } } From 07d05f7f9cc171b465ca4d9de43abcd085a997dd Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Aug 2024 23:22:57 +0200 Subject: [PATCH 32/52] fix key len bug --- libsql-wal/src/registry.rs | 2 +- libsql-wal/src/storage/backend/s3.rs | 21 +++++++++++---------- libsql-wal/src/storage/mod.rs | 14 +++++++------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 5ae5c89ee2..3333a19e82 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -521,7 +521,7 @@ where let mut seen = RoaringBitmap::new(); let replicator = StorageReplicator::new(storage, shared.namespace().clone()); let stream = replicator - .stream(&mut seen, local_current_frame_no, 1) + .stream(&mut seen, remote_durable_frame_no, 1) .peekable(); let mut injector = Injector::new(shared.clone(), 10)?; // use pin to the heap so that we can drop the stream in the loop, and count `seen`. diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 61465df242..66aff1f2da 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -198,12 +198,14 @@ impl S3Backend { folder_key: &FolderKey<'_>, frame_no: u64, ) -> Result> { + let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key); let lookup_key = s3_segment_index_lookup_key(&folder_key, frame_no); let objects = self .client .list_objects_v2() .bucket(&config.bucket) + .prefix(lookup_key_prefix) .start_after(lookup_key) .send() .await @@ -214,15 +216,10 @@ impl S3Backend { }; let key = contents.key().expect("misssing key?"); let key_path: &Path = key.as_ref(); - let segment_key: SegmentKey = key_path - .file_stem() - .expect("invalid key") - .to_str() - .expect("invalid key") - .parse() - .expect("invalid key"); - - Ok(Some(segment_key)) + + let key = SegmentKey::validate_from_path(key_path, &folder_key.namespace); + + Ok(key) } // This method could probably be optimized a lot by using indexes and only downloading useful @@ -335,8 +332,12 @@ fn s3_segment_index_key(folder_key: &FolderKey, segment_key: &SegmentKey) -> Str format!("{folder_key}/indexes/{segment_key}") } +fn s3_segment_index_lookup_key_prefix(folder_key: &FolderKey) -> String { + format!("{folder_key}/indexes/") +} + fn s3_segment_index_lookup_key(folder_key: &FolderKey, frame_no: u64) -> String { - format!("{folder_key}/indexes/{:019}", u64::MAX - frame_no) + format!("{folder_key}/indexes/{:020}", u64::MAX - frame_no) } impl Backend for S3Backend diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 41caef739f..56ed5feff8 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -52,12 +52,12 @@ pub enum RestoreOptions { /// let meta = SegmentMeta { start_frame_no: 101, end_frame_no: 1000 }; /// map.insert(SegmentKey(&meta).to_string(), meta); /// -/// map.range(format!("{:019}", u64::MAX - 50)..).next(); -/// map.range(format!("{:019}", u64::MAX - 0)..).next(); -/// map.range(format!("{:019}", u64::MAX - 1)..).next(); -/// map.range(format!("{:019}", u64::MAX - 100)..).next(); -/// map.range(format!("{:019}", u64::MAX - 101)..).next(); -/// map.range(format!("{:019}", u64::MAX - 5000)..).next(); +/// map.range(format!("{:020}", u64::MAX - 50)..).next(); +/// map.range(format!("{:020}", u64::MAX - 0)..).next(); +/// map.range(format!("{:020}", u64::MAX - 1)..).next(); +/// map.range(format!("{:020}", u64::MAX - 100)..).next(); +/// map.range(format!("{:020}", u64::MAX - 101)..).next(); +/// map.range(format!("{:020}", u64::MAX - 5000)..).next(); /// ``` #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SegmentKey { @@ -137,7 +137,7 @@ impl fmt::Display for SegmentKey { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{:019}-{:019}", + "{:020}-{:020}", u64::MAX - self.start_frame_no, u64::MAX - self.end_frame_no, ) From d55408200b91215ad6cea1fbdaa65bcc13bf33ce Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 30 Aug 2024 12:12:16 +0200 Subject: [PATCH 33/52] add durable frame_no to proto --- bottomless/src/replicator.rs | 1 + libsql-replication/proto/replication_log.proto | 1 + libsql-replication/src/generated/wal_log.rs | 2 ++ libsql-replication/src/replicator.rs | 2 ++ libsql-server/src/rpc/replication/libsql_replicator.rs | 2 ++ libsql-server/src/rpc/replication/replication_log.rs | 2 ++ libsql/src/replication/local_client.rs | 4 ++-- 7 files changed, 12 insertions(+), 2 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index f8e362764a..5b9620d79b 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1652,6 +1652,7 @@ impl Replicator { let frame = RpcFrame { data: frame_to_inject.bytes(), timestamp: None, + durable_frame_no: None, }; injector.inject_frame(frame).await?; applied_wal_frame = true; diff --git a/libsql-replication/proto/replication_log.proto b/libsql-replication/proto/replication_log.proto index b358232705..b2ba8c3bc0 100644 --- a/libsql-replication/proto/replication_log.proto +++ b/libsql-replication/proto/replication_log.proto @@ -36,6 +36,7 @@ message Frame { // if this frames is a commit frame, then this can be set // to the time when the transaction was commited optional int64 timestamp = 2; + optional int64 durable_frame_no = 3; } message Frames { diff --git a/libsql-replication/src/generated/wal_log.rs b/libsql-replication/src/generated/wal_log.rs index a34d5e59dd..2468347a32 100644 --- a/libsql-replication/src/generated/wal_log.rs +++ b/libsql-replication/src/generated/wal_log.rs @@ -83,6 +83,8 @@ pub struct Frame { /// to the time when the transaction was commited #[prost(int64, optional, tag = "2")] pub timestamp: ::core::option::Option, + #[prost(int64, optional, tag = "3")] + pub durable_frame_no: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index 92edbd4389..fc89813fe8 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -772,6 +772,7 @@ mod test { .map(|f| RpcFrame { data: f.bytes(), timestamp: None, + durable_frame_no: None, }) .take(2) .map(Ok) @@ -785,6 +786,7 @@ mod test { .map(|f| RpcFrame { data: f.bytes(), timestamp: None, + durable_frame_no: None, }) .map(Ok) .collect::>(); diff --git a/libsql-server/src/rpc/replication/libsql_replicator.rs b/libsql-server/src/rpc/replication/libsql_replicator.rs index 3f5fdb6dc4..e454ef0a79 100644 --- a/libsql-server/src/rpc/replication/libsql_replicator.rs +++ b/libsql-server/src/rpc/replication/libsql_replicator.rs @@ -102,6 +102,7 @@ where Poll::Ready(Some(Ok(RpcFrame { data, timestamp: None, + durable_frame_no: None, }))) } WalFlavor::Sqlite => { @@ -116,6 +117,7 @@ where Poll::Ready(Some(Ok(RpcFrame { data: frame.bytes(), timestamp: None, + durable_frame_no: None, }))) } } diff --git a/libsql-server/src/rpc/replication/replication_log.rs b/libsql-server/src/rpc/replication/replication_log.rs index 356d3d0f16..3abf06b283 100644 --- a/libsql-server/src/rpc/replication/replication_log.rs +++ b/libsql-server/src/rpc/replication/replication_log.rs @@ -187,6 +187,7 @@ fn map_frame_stream_output( Ok((frame, ts)) => Ok(Frame { data: frame.bytes(), timestamp: ts.map(|ts| ts.timestamp_millis()), + durable_frame_no: None, }), Err(LogReadError::SnapshotRequired) => Err(Status::new( tonic::Code::FailedPrecondition, @@ -431,6 +432,7 @@ mod snapshot_stream { yield Ok(Frame { data: libsql_replication::frame::Frame::from(frame).bytes(), timestamp: None, + durable_frame_no: None, }); } Err(e) => { diff --git a/libsql/src/replication/local_client.rs b/libsql/src/replication/local_client.rs index d3c713f530..421210fff0 100644 --- a/libsql/src/replication/local_client.rs +++ b/libsql/src/replication/local_client.rs @@ -47,7 +47,7 @@ impl ReplicatorClient for LocalClient { async fn next_frames(&mut self) -> Result { match self.frames.take() { Some(Frames::Vec(f)) => { - let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None }).map(Ok); + let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None, durable_frame_no: None }).map(Ok); Ok(Box::pin(tokio_stream::iter(iter))) } Some(f @ Frames::Snapshot(_)) => { @@ -72,7 +72,7 @@ impl ReplicatorClient for LocalClient { next.header_mut().size_after = size_after.into(); } let frame = Frame::from(next); - yield RpcFrame { data: frame.bytes(), timestamp: None }; + yield RpcFrame { data: frame.bytes(), timestamp: None, durable_frame_no: None }; } }; From 631dd815d93d25e52f7b2920203bc66730d62f62 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 30 Aug 2024 13:04:55 +0200 Subject: [PATCH 34/52] pass durable frame_no to libsql-injector --- libsql-replication/src/injector/libsql_injector.rs | 4 ++++ libsql-replication/src/injector/mod.rs | 2 ++ libsql-replication/src/injector/sqlite_injector/mod.rs | 3 +++ libsql-wal/src/replication/injector.rs | 9 +++++++++ 4 files changed, 18 insertions(+) diff --git a/libsql-replication/src/injector/libsql_injector.rs b/libsql-replication/src/injector/libsql_injector.rs index 7c01522e1d..f730881cf6 100644 --- a/libsql-replication/src/injector/libsql_injector.rs +++ b/libsql-replication/src/injector/libsql_injector.rs @@ -49,4 +49,8 @@ impl super::Injector for LibsqlInjector { .map_err(|e| Error::FatalInjectError(e.into()))?; Ok(None) } + + fn durable_frame_no(&mut self, frame_no: u64) { + self.injector.set_durable(frame_no); + } } diff --git a/libsql-replication/src/injector/mod.rs b/libsql-replication/src/injector/mod.rs index b139f07cc9..9519dc1a20 100644 --- a/libsql-replication/src/injector/mod.rs +++ b/libsql-replication/src/injector/mod.rs @@ -29,4 +29,6 @@ pub trait Injector { /// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame /// are then injected into the wal. fn flush(&mut self) -> impl Future>> + Send; + + fn durable_frame_no(&mut self, frame_no: u64); } diff --git a/libsql-replication/src/injector/sqlite_injector/mod.rs b/libsql-replication/src/injector/sqlite_injector/mod.rs index f6ce2aa89f..ad336ab1d0 100644 --- a/libsql-replication/src/injector/sqlite_injector/mod.rs +++ b/libsql-replication/src/injector/sqlite_injector/mod.rs @@ -46,6 +46,9 @@ impl Injector for SqliteInjector { let inner = self.inner.clone(); spawn_blocking(move || inner.lock().flush()).await.unwrap() } + + #[inline] + fn durable_frame_no(&mut self, _frame_no: u64) { } } impl SqliteInjector { diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index b33e8d1ed9..d5198ead15 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -1,5 +1,6 @@ //! The injector is the module in charge of injecting frames into a replica database. +use std::sync::atomic::Ordering; use std::sync::Arc; use crate::error::Result; @@ -36,6 +37,14 @@ impl Injector { }) } + pub fn set_durable(&self, durable_frame_no: u64) { + let mut old = self.wal.durable_frame_no.lock(); + if *old < durable_frame_no { + *old = durable_frame_no + } { + todo!("primary reported older frameno than current"); + } + } pub async fn insert_frame(&mut self, frame: Box) -> Result> { let size_after = frame.size_after(); self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no()); From 3eb819dbac07925a9358e3236d580af6aeb4b21a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 30 Aug 2024 14:03:01 +0200 Subject: [PATCH 35/52] fixup! add durable frame_no to proto --- libsql-replication/proto/replication_log.proto | 2 +- libsql-replication/src/generated/wal_log.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libsql-replication/proto/replication_log.proto b/libsql-replication/proto/replication_log.proto index b2ba8c3bc0..a7500025a2 100644 --- a/libsql-replication/proto/replication_log.proto +++ b/libsql-replication/proto/replication_log.proto @@ -36,7 +36,7 @@ message Frame { // if this frames is a commit frame, then this can be set // to the time when the transaction was commited optional int64 timestamp = 2; - optional int64 durable_frame_no = 3; + optional uint64 durable_frame_no = 3; } message Frames { diff --git a/libsql-replication/src/generated/wal_log.rs b/libsql-replication/src/generated/wal_log.rs index 2468347a32..9f716eabfc 100644 --- a/libsql-replication/src/generated/wal_log.rs +++ b/libsql-replication/src/generated/wal_log.rs @@ -83,8 +83,8 @@ pub struct Frame { /// to the time when the transaction was commited #[prost(int64, optional, tag = "2")] pub timestamp: ::core::option::Option, - #[prost(int64, optional, tag = "3")] - pub durable_frame_no: ::core::option::Option, + #[prost(uint64, optional, tag = "3")] + pub durable_frame_no: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] From 2e7685d693b633ca17aad40f5498e3396a6ed8a1 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:25:47 +0200 Subject: [PATCH 36/52] methods for getting/setting the durable frame_no through the injector --- libsql-wal/src/replication/injector.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index d5198ead15..5915324e68 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -18,6 +18,7 @@ pub struct Injector { capacity: usize, tx: TxGuardOwned, max_tx_frame_no: u64, + previous_durable_frame_no: u64, } impl Injector { @@ -37,14 +38,19 @@ impl Injector { }) } - pub fn set_durable(&self, durable_frame_no: u64) { + pub fn set_durable(&mut self, durable_frame_no: u64) { let mut old = self.wal.durable_frame_no.lock(); - if *old < durable_frame_no { - *old = durable_frame_no - } { - todo!("primary reported older frameno than current"); + if *old <= durable_frame_no { + self.previous_durable_frame_no = *old; + *old = durable_frame_no; + } else { + todo!("primary reported older frame_no than current"); } } + + pub fn current_durable(&self) -> u64 { + *self.wal.durable_frame_no.lock() + } pub async fn insert_frame(&mut self, frame: Box) -> Result> { let size_after = frame.size_after(); self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no()); From 12008ad10b3efc508287a06c47acfca299e2414c Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:27:05 +0200 Subject: [PATCH 37/52] re-acquire txn on every new injection. caused bugs with the savepoint not being updated and frame_count not being computed correctly. --- libsql-wal/src/replication/injector.rs | 66 +++++++++++++++++--------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 5915324e68..f0067840bc 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -16,25 +16,20 @@ pub struct Injector { buffer: Vec>, /// capacity of the frame buffer capacity: usize, - tx: TxGuardOwned, + tx: Option>, max_tx_frame_no: u64, previous_durable_frame_no: u64, } impl Injector { pub fn new(wal: Arc>, buffer_capacity: usize) -> Result { - let mut tx = Transaction::Read(wal.begin_read(u64::MAX)); - wal.upgrade(&mut tx)?; - let tx = tx - .into_write() - .unwrap_or_else(|_| unreachable!()) - .into_lock_owned(); Ok(Self { wal, buffer: Vec::with_capacity(buffer_capacity), capacity: buffer_capacity, - tx, + tx: None, max_tx_frame_no: 0, + previous_durable_frame_no: 0, }) } @@ -51,7 +46,22 @@ impl Injector { pub fn current_durable(&self) -> u64 { *self.wal.durable_frame_no.lock() } + pub fn maybe_begin_txn(&mut self) -> Result<()> { + if self.tx.is_none() { + let mut tx = Transaction::Read(self.wal.begin_read(u64::MAX)); + self.wal.upgrade(&mut tx)?; + let tx = tx + .into_write() + .unwrap_or_else(|_| unreachable!()) + .into_lock_owned(); + assert!(self.tx.replace(tx).is_none()); + } + + Ok(()) + } + pub async fn insert_frame(&mut self, frame: Box) -> Result> { + self.maybe_begin_txn()?; let size_after = frame.size_after(); self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no()); self.buffer.push(frame); @@ -64,28 +74,38 @@ impl Injector { } pub async fn flush(&mut self, size_after: Option) -> Result<()> { - let buffer = std::mem::take(&mut self.buffer); - let current = self.wal.current.load(); - let commit_data = size_after.map(|size| (size, self.max_tx_frame_no)); - if commit_data.is_some() { - self.max_tx_frame_no = 0; + if !self.buffer.is_empty() && self.tx.is_some() { + let last_committed_frame_no = self.max_tx_frame_no; + { + let tx = self.tx.as_mut().expect("we just checked that tx was there"); + let buffer = std::mem::take(&mut self.buffer); + let current = self.wal.current.load(); + let commit_data = size_after.map(|size| (size, self.max_tx_frame_no)); + if commit_data.is_some() { + self.max_tx_frame_no = 0; + } + let buffer = current + .inject_frames(buffer, commit_data, tx) + .await?; + self.buffer = buffer; + self.buffer.clear(); + } + + if size_after.is_some() { + let mut tx = self.tx.take().unwrap(); + self.wal.new_frame_notifier.send_replace(last_committed_frame_no); + } + } } - let buffer = current - .inject_frames(buffer, commit_data, &mut self.tx) - .await?; - self.buffer = buffer; - self.buffer.clear(); Ok(()) } pub fn rollback(&mut self) { self.buffer.clear(); - self.tx.reset(0); - } - - pub(crate) fn into_guard(self) -> TxGuardOwned { - self.tx + if let Some(tx) = self.tx.as_mut() { + tx.reset(0); + } } } From 718b27c4a16b47e85a8b44fc1e5182f1268be0b1 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:28:15 +0200 Subject: [PATCH 38/52] set durable frame_no upon injection --- libsql-replication/src/replicator.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index fc89813fe8..f28bb9baa8 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -328,6 +328,10 @@ where async fn inject_frame(&mut self, frame: RpcFrame) -> Result<(), Error> { self.frames_synced += 1; + if let Some(frame_no) = frame.durable_frame_no { + self.injector.durable_frame_no(frame_no); + } + match self.injector.inject_frame(frame).await? { Some(commit_fno) => { self.client.commit_frame_no(commit_fno).await?; From 59d6eb80a4c469cd42c21811d283e6453c699a6a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:29:16 +0200 Subject: [PATCH 39/52] replicator: send durable_frame_no --- .../src/rpc/replication/libsql_replicator.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/libsql-server/src/rpc/replication/libsql_replicator.rs b/libsql-server/src/rpc/replication/libsql_replicator.rs index e454ef0a79..b0209c9a5d 100644 --- a/libsql-server/src/rpc/replication/libsql_replicator.rs +++ b/libsql-server/src/rpc/replication/libsql_replicator.rs @@ -13,6 +13,7 @@ use libsql_replication::rpc::replication::{ use libsql_wal::io::StdIO; use libsql_wal::registry::WalRegistry; use libsql_wal::segment::Frame; +use libsql_wal::shared_wal::SharedWal; use md5::{Digest as _, Md5}; use tokio_stream::Stream; use tonic::Status; @@ -72,12 +73,13 @@ pin_project_lite::pin_project! { #[pin] inner: S, flavor: WalFlavor, + shared: Arc>, } } impl FrameStreamAdapter { - fn new(inner: S, flavor: WalFlavor) -> Self { - Self { inner, flavor } + fn new(inner: S, flavor: WalFlavor, shared: Arc>) -> Self { + Self { inner, flavor, shared } } } @@ -93,6 +95,11 @@ where Some(Ok(f)) => { match this.flavor { WalFlavor::Libsql => { + let durable_frame_no = if f.header().is_commit() { + Some(this.shared.durable_frame_no()) + } else { + None + }; // safety: frame implemements zerocopy traits, so it can safely be interpreted as a // byte slize of the same size let bytes: Box<[u8; size_of::()]> = @@ -102,7 +109,7 @@ where Poll::Ready(Some(Ok(RpcFrame { data, timestamp: None, - durable_frame_no: None, + durable_frame_no, }))) } WalFlavor::Sqlite => { @@ -133,6 +140,7 @@ impl ReplicationLog for LibsqlReplicationService { type LogEntriesStream = BoxStream<'static, Result>; type SnapshotStream = BoxStream<'static, Result>; + #[tracing::instrument(skip_all, fields(namespace))] async fn log_entries( &self, req: tonic::Request, @@ -143,10 +151,10 @@ impl ReplicationLog for LibsqlReplicationService { let req = req.into_inner(); // TODO: replicator should only accecpt NonZero let replicator = - libsql_wal::replication::replicator::Replicator::new(shared, req.next_offset.max(1)); + libsql_wal::replication::replicator::Replicator::new(shared.clone(), req.next_offset.max(1)); let flavor = req.wal_flavor(); - let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor); + let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared); Ok(tonic::Response::new(Box::pin(stream))) } From 980ef5bbbe7876c6b5969dc8b1b94af69f79c116 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:29:38 +0200 Subject: [PATCH 40/52] instrument checkpoint --- libsql-wal/src/checkpointer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libsql-wal/src/checkpointer.rs b/libsql-wal/src/checkpointer.rs index acbcd5539e..176140c2cc 100644 --- a/libsql-wal/src/checkpointer.rs +++ b/libsql-wal/src/checkpointer.rs @@ -52,6 +52,7 @@ where IO: Io, S: Sync + Send + 'static, { + #[tracing::instrument(skip(self))] fn checkpoint( &self, namespace: &NamespaceName, From 348c6830d60ade4b1b89a4a9363a1c51025dc50d Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:31:26 +0200 Subject: [PATCH 41/52] introduce TxnGuard trait to pass either TxnGuardOwned/TxnGuardShared --- libsql-wal/src/registry.rs | 91 +++++++++++++++++++---------------- libsql-wal/src/shared_wal.rs | 4 +- libsql-wal/src/transaction.rs | 15 ++++-- 3 files changed, 61 insertions(+), 49 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 3333a19e82..32e982db61 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -27,7 +27,7 @@ use crate::segment::Segment; use crate::segment::{current::CurrentSegment, sealed::SealedSegment}; use crate::shared_wal::{SharedWal, SwapLog}; use crate::storage::{OnStoreCallback, Storage}; -use crate::transaction::{Transaction, TxGuard}; +use crate::transaction::TxGuard; use crate::{LibsqlFooter, LIBSQL_PAGE_SIZE}; use libsql_sys::name::NamespaceName; @@ -100,48 +100,13 @@ where S: Storage>, { #[tracing::instrument(skip_all)] - fn swap_current(&self, shared: &SharedWal, tx: &TxGuard<::File>) -> Result<()> { + fn swap_current( + &self, + shared: &SharedWal, + tx: &dyn TxGuard<::File>, + ) -> Result<()> { assert!(tx.is_commited()); - // at this point we must hold a lock to a commited transaction. - - let current = shared.current.load(); - if current.is_empty() { - return Ok(()); - } - let start_frame_no = current.next_frame_no(); - let path = self - .path - .join(shared.namespace().as_str()) - .join(format!("{}:{start_frame_no:020}.seg", shared.namespace())); - - let segment_file = self.io.open(true, true, true, &path)?; - let salt = self.io.with_rng(|rng| rng.gen()); - let new = CurrentSegment::create( - segment_file, - path, - start_frame_no, - current.db_size(), - current.tail().clone(), - salt, - current.log_id(), - )?; - // sealing must the last fallible operation, because we don't want to end up in a situation - // where the current log is sealed and it wasn't swapped. - if let Some(sealed) = current.seal()? { - new.tail().push(sealed.clone()); - maybe_store_segment( - self.storage.as_ref(), - &self.checkpoint_notifier, - &shared.namespace, - &shared.durable_frame_no, - sealed, - ); - } - - shared.current.swap(Arc::new(new)); - tracing::debug!("current segment swapped"); - - Ok(()) + self.swap_current_inner(shared) } } @@ -498,6 +463,48 @@ where Ok(()) } + + #[tracing::instrument(skip_all)] + fn swap_current_inner(&self, shared: &SharedWal) -> Result<()> { + let current = shared.current.load(); + if current.is_empty() { + return Ok(()); + } + let start_frame_no = current.next_frame_no(); + let path = self + .path + .join(shared.namespace().as_str()) + .join(format!("{}:{start_frame_no:020}.seg", shared.namespace())); + + let segment_file = self.io.open(true, true, true, &path)?; + let salt = self.io.with_rng(|rng| rng.gen()); + let new = CurrentSegment::create( + segment_file, + path, + start_frame_no, + current.db_size(), + current.tail().clone(), + salt, + current.log_id(), + )?; + // sealing must the last fallible operation, because we don't want to end up in a situation + // where the current log is sealed and it wasn't swapped. + if let Some(sealed) = current.seal()? { + new.tail().push(sealed.clone()); + maybe_store_segment( + self.storage.as_ref(), + &self.checkpoint_notifier, + &shared.namespace, + &shared.durable_frame_no, + sealed, + ); + } + + shared.current.swap(Arc::new(new)); + tracing::debug!("current segment swapped"); + + Ok(()) + } } #[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))] diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index e94883fd01..8a0460c74c 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -35,7 +35,7 @@ pub struct WalLock { } pub(crate) trait SwapLog: Sync + Send + 'static { - fn swap_current(&self, shared: &SharedWal, tx: &TxGuard) -> Result<()>; + fn swap_current(&self, shared: &SharedWal, tx: &dyn TxGuard) -> Result<()>; } pub struct SharedWal { @@ -300,7 +300,7 @@ impl SharedWal { } /// Swap the current log. A write lock must be held, but the transaction must be must be committed already. - pub(crate) fn swap_current(&self, tx: &TxGuard) -> Result<()> { + pub(crate) fn swap_current(&self, tx: &impl TxGuard) -> Result<()> { self.registry.swap_current(self, tx)?; Ok(()) } diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index e9b9159656..8016f33f61 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -189,12 +189,17 @@ impl DerefMut for TxGuardOwned { } } -pub struct TxGuard<'a, F> { +pub trait TxGuard: Deref> + DerefMut + Send + Sync { } + +impl<'a, F: Send + Sync> TxGuard for TxGuardShared<'a, F> { } +impl TxGuard for TxGuardOwned { } + +pub struct TxGuardShared<'a, F> { _lock: async_lock::MutexGuardArc>, inner: &'a mut WriteTransaction, } -impl<'a, F> Deref for TxGuard<'a, F> { +impl<'a, F> Deref for TxGuardShared<'a, F> { type Target = WriteTransaction; fn deref(&self) -> &Self::Target { @@ -202,7 +207,7 @@ impl<'a, F> Deref for TxGuard<'a, F> { } } -impl<'a, F> DerefMut for TxGuard<'a, F> { +impl<'a, F> DerefMut for TxGuardShared<'a, F> { fn deref_mut(&mut self) -> &mut Self::Target { self.inner } @@ -225,11 +230,11 @@ impl WriteTransaction { savepoint_id } - pub fn lock(&mut self) -> TxGuard { + pub fn lock(&mut self) -> TxGuardShared { let g = self.wal_lock.tx_id.lock_arc_blocking(); match *g { // we still hold the lock, we can proceed - Some(id) if self.id == id => TxGuard { + Some(id) if self.id == id => TxGuardShared { _lock: g, inner: self, }, From 20bdf11ecc67f118772a17ba3ffcfa02410f5063 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:32:30 +0200 Subject: [PATCH 42/52] register unstored segments with checkpointer --- libsql-wal/src/registry.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 32e982db61..5f47b369a5 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -131,6 +131,8 @@ fn maybe_store_segment( }); storage.store(namespace, seg, None, cb); } else { + // segment can be checkpointed right away. + let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone())); tracing::debug!("segment marked as not storable; skipping"); } } From 1e93bc8446ccdace92aa24646b05acbfe167c6aa Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:32:48 +0200 Subject: [PATCH 43/52] do not seal segment in sync_one this is now done by the injector --- libsql-wal/src/registry.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 5f47b369a5..55efd37481 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -552,18 +552,6 @@ where None => break, } } - - let mut tx = Transaction::Write(injector.into_guard().into_inner()); - let ret = { - let mut guard = tx.as_write_mut().unwrap().lock(); - guard.commit(); - // the current segment it unordered, no new frames should be appended to it, seal it and - // open a new segment - shared.swap_current(&guard) - }; - // make sure the tx is always ended before it's dropped! - tx.end(); - ret?; } tracing::info!("local database is up to date"); From e22ba695df6d0f4932def0be7158170b246b46c9 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:33:15 +0200 Subject: [PATCH 44/52] set durable_frame_no before injecting in sync_one --- libsql-wal/src/registry.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 55efd37481..e922903b3f 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -533,6 +533,9 @@ where .stream(&mut seen, remote_durable_frame_no, 1) .peekable(); let mut injector = Injector::new(shared.clone(), 10)?; + // we set the durable frame_no before we start injecting, because the wal may want to + // checkpoint on commit. + injector.set_durable(remote_durable_frame_no); // use pin to the heap so that we can drop the stream in the loop, and count `seen`. let mut stream = Box::pin(stream); loop { From 15e392c0e8acc05b698c1d553c7548fa8b88e5be Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:34:30 +0200 Subject: [PATCH 45/52] swap segments in injector --- libsql-wal/src/replication/injector.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index f0067840bc..1862d309ef 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -1,6 +1,5 @@ //! The injector is the module in charge of injecting frames into a replica database. -use std::sync::atomic::Ordering; use std::sync::Arc; use crate::error::Result; @@ -46,6 +45,7 @@ impl Injector { pub fn current_durable(&self) -> u64 { *self.wal.durable_frame_no.lock() } + pub fn maybe_begin_txn(&mut self) -> Result<()> { if self.tx.is_none() { let mut tx = Transaction::Read(self.wal.begin_read(u64::MAX)); @@ -94,6 +94,15 @@ impl Injector { if size_after.is_some() { let mut tx = self.tx.take().unwrap(); self.wal.new_frame_notifier.send_replace(last_committed_frame_no); + // the strategy to swap the current log is to do it on change of durable boundary, + // when we have caught up with the current durable frame_no + if self.current_durable() != self.previous_durable_frame_no && self.current_durable() >= self.max_tx_frame_no { + let wal = self.wal.clone(); + // FIXME: tokio dependency here is annoying, we need an async version of swap_current. + tokio::task::spawn_blocking(move || { + tx.commit(); + wal.swap_current(&tx) + }).await.unwrap()? } } } From 34389a3998a01aec1c8d8906fdfc7bfe0c404273 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:36:54 +0200 Subject: [PATCH 46/52] add more logging --- libsql-wal/src/replication/replicator.rs | 7 +++++++ libsql-wal/src/segment/list.rs | 4 ++++ libsql-wal/src/segment/sealed.rs | 4 +++- libsql-wal/src/shared_wal.rs | 1 + libsql-wal/src/transaction.rs | 2 +- libsql-wal/src/wal.rs | 6 +++--- 6 files changed, 19 insertions(+), 5 deletions(-) diff --git a/libsql-wal/src/replication/replicator.rs b/libsql-wal/src/replication/replicator.rs index 4e5850736e..447ae169c2 100644 --- a/libsql-wal/src/replication/replicator.rs +++ b/libsql-wal/src/replication/replicator.rs @@ -40,17 +40,21 @@ impl Replicator { /// /// In a single replication step, the replicator guarantees that a minimal set of frames is /// sent to the replica. + #[tracing::instrument(skip(self))] pub fn into_frame_stream(mut self) -> impl Stream>> + Send { async_stream::try_stream! { loop { // First we decide up to what frame_no we want to replicate in this step. If we are // already up to date, wait for something to happen + tracing::debug!(next_frame_no = self.next_frame_no); let most_recent_frame_no = *self .new_frame_notifier .wait_for(|fno| *fno >= self.next_frame_no) .await .expect("channel cannot be closed because we hold a ref to the sending end"); + tracing::debug!(most_recent_frame_no, "new frame_no available"); + let mut commit_frame_no = 0; // we have stuff to replicate if most_recent_frame_no >= self.next_frame_no { @@ -66,6 +70,7 @@ impl Replicator { let mut stream = stream.peekable(); + tracing::debug!(replicated_until, "replicating from current log"); loop { let Some(frame) = stream.next().await else { break }; let mut frame = frame.map_err(|e| Error::CurrentSegment(e.into()))?; @@ -88,6 +93,7 @@ impl Replicator { .stream_pages_from(replicated_until, self.next_frame_no, &mut seen).await; tokio::pin!(stream); + tracing::debug!(replicated_until, "replicating from tail"); let mut stream = stream.peekable(); let should_replicate_from_storage = replicated_until != self.next_frame_no; @@ -110,6 +116,7 @@ impl Replicator { // Replicating from sealed segments was not enough, so we replicate from // durable storage if let Some(replicated_until) = replicated_until { + tracing::debug!("replicating from durable storage"); let stream = self .shared .stored_segments diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index 1708967bba..647ea9581c 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -78,6 +78,7 @@ where /// Checkpoints as many segments as possible to the main db file, and return the checkpointed /// frame_no, if anything was checkpointed + #[tracing::instrument(skip_all)] pub async fn checkpoint( &self, db_file: &IO::File, @@ -108,6 +109,7 @@ where // readers pointing to them while let Some(segment) = &*current { // skip any segment more recent than until_frame_no + tracing::debug!(last_committed = segment.last_committed(), until = until_frame_no); if segment.last_committed() <= until_frame_no { if !segment.is_checkpointable() { segs.clear(); @@ -120,6 +122,7 @@ where // nothing to checkpoint rn if segs.is_empty() { + tracing::debug!("nothing to checkpoint"); return Ok(None); } @@ -133,6 +136,7 @@ where let mut last_replication_index = 0; while let Some((k, v)) = union.next() { let page_no = u32::from_be_bytes(k.try_into().unwrap()); + tracing::debug!(page_no); let v = v.iter().min_by_key(|i| i.index).unwrap(); let offset = v.value as u32; diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index 1261f0029e..cc6b9d9342 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -181,7 +181,9 @@ where } fn is_checkpointable(&self) -> bool { - self.read_locks.load(Ordering::Relaxed) == 0 + let read_locks = self.read_locks.load(Ordering::Relaxed); + tracing::debug!(read_locks); + read_locks == 0 } fn size_after(&self) -> u32 { diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 8a0460c74c..ee1d24cfc6 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -305,6 +305,7 @@ impl SharedWal { Ok(()) } + #[tracing::instrument(skip(self))] pub async fn checkpoint(&self) -> Result> { let durable_frame_no = *self.durable_frame_no.lock(); let checkpointed_frame_no = self diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index 8016f33f61..1545b3afc5 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -318,7 +318,7 @@ impl WriteTransaction { } } - tracing::debug!(id = read_tx.id, "lock released"); + tracing::trace!(id = read_tx.id, "lock released"); read_tx } diff --git a/libsql-wal/src/wal.rs b/libsql-wal/src/wal.rs index 4cbf81b10e..12dd75c7b0 100644 --- a/libsql-wal/src/wal.rs +++ b/libsql-wal/src/wal.rs @@ -131,7 +131,7 @@ impl Wal for LibsqlWal { self.last_read_frame_no = Some(tx.max_frame_no); self.tx = Some(Transaction::Read(tx)); - tracing::debug!(invalidate_cache, "read started"); + tracing::trace!(invalidate_cache, "read started"); Ok(invalidate_cache) } @@ -182,9 +182,9 @@ impl Wal for LibsqlWal { match self.tx.as_mut() { Some(tx) => { self.shared.upgrade(tx).map_err(Into::into)?; - tracing::debug!("write lock acquired"); + tracing::trace!("write lock acquired"); } - None => todo!("should acquire read txn first"), + None => panic!("should acquire read txn first"), } Ok(()) From 764a5f1c7c7453258ee728a297fc059ca076e83f Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:38:02 +0200 Subject: [PATCH 47/52] fixup! introduce TxnGuard trait to pass either TxnGuardOwned/TxnGuardShared --- libsql-wal/src/segment/current.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 594e6b42c4..4f5a7d3924 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -23,7 +23,7 @@ use crate::io::file::FileExt; use crate::io::Inspect; use crate::segment::{checked_frame_offset, SegmentFlags}; use crate::segment::{frame_offset, page_offset, sealed::SealedSegment}; -use crate::transaction::{Transaction, TxGuard, TxGuardOwned}; +use crate::transaction::{Transaction, TxGuardShared, TxGuardOwned}; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; use super::list::SegmentList; @@ -231,7 +231,7 @@ impl CurrentSegment { &self, pages: impl Iterator, size_after: Option, - tx: &mut TxGuard, + tx: &mut TxGuardShared, ) -> Result> where F: FileExt, From a2a7fe325ebc1f8f560674ace2cdbd172ef38942 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:38:11 +0200 Subject: [PATCH 48/52] add durable_frame_no getter to SharedWal --- libsql-wal/src/shared_wal.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index ee1d24cfc6..2df43ed060 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -96,6 +96,10 @@ impl SharedWal { self.current.load().log_id() } + pub fn durable_frame_no(&self) -> u64 { + *self.durable_frame_no.lock() + } + #[tracing::instrument(skip_all)] pub fn begin_read(&self, conn_id: u64) -> ReadTransaction { // FIXME: this is not enough to just increment the counter, we must make sure that the segment From c5210f256b040da06c589b94c481f88720d1d9dd Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:38:38 +0200 Subject: [PATCH 49/52] remove useless comment --- libsql-wal/src/shared_wal.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 2df43ed060..daa0a51555 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -274,7 +274,6 @@ impl SharedWal { self.new_frame_notifier.send_replace(last_committed); } - // TODO: use config for max log size if tx.is_commited() && current.count_committed() > self.max_segment_size.load(Ordering::Relaxed) { From 7ad2188a2ff37a998422fd87703c3e167363f3ca Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:39:06 +0200 Subject: [PATCH 50/52] fixup! add more logging --- libsql-wal/src/segment/list.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index 647ea9581c..b14cf7830f 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -136,7 +136,7 @@ where let mut last_replication_index = 0; while let Some((k, v)) = union.next() { let page_no = u32::from_be_bytes(k.try_into().unwrap()); - tracing::debug!(page_no); + tracing::trace!(page_no); let v = v.iter().min_by_key(|i| i.index).unwrap(); let offset = v.value as u32; @@ -198,6 +198,8 @@ where self.len.fetch_sub(segs.len(), Ordering::Relaxed); + tracing::debug!(until = last_replication_index, "checkpointed"); + Ok(Some(last_replication_index)) } From 8918d29164b9df5ab7bdc74d86767fb45badf2e1 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sat, 31 Aug 2024 22:39:40 +0200 Subject: [PATCH 51/52] fmt --- .../src/injector/sqlite_injector/mod.rs | 2 +- .../src/rpc/replication/libsql_replicator.rs | 12 +++++++++--- libsql-wal/src/replication/injector.rs | 18 +++++++++++------- libsql-wal/src/segment/current.rs | 2 +- libsql-wal/src/segment/list.rs | 5 ++++- libsql-wal/src/shared_wal.rs | 2 +- libsql-wal/src/transaction.rs | 6 +++--- 7 files changed, 30 insertions(+), 17 deletions(-) diff --git a/libsql-replication/src/injector/sqlite_injector/mod.rs b/libsql-replication/src/injector/sqlite_injector/mod.rs index ad336ab1d0..17fec1c553 100644 --- a/libsql-replication/src/injector/sqlite_injector/mod.rs +++ b/libsql-replication/src/injector/sqlite_injector/mod.rs @@ -48,7 +48,7 @@ impl Injector for SqliteInjector { } #[inline] - fn durable_frame_no(&mut self, _frame_no: u64) { } + fn durable_frame_no(&mut self, _frame_no: u64) {} } impl SqliteInjector { diff --git a/libsql-server/src/rpc/replication/libsql_replicator.rs b/libsql-server/src/rpc/replication/libsql_replicator.rs index b0209c9a5d..19ece80749 100644 --- a/libsql-server/src/rpc/replication/libsql_replicator.rs +++ b/libsql-server/src/rpc/replication/libsql_replicator.rs @@ -79,7 +79,11 @@ pin_project_lite::pin_project! { impl FrameStreamAdapter { fn new(inner: S, flavor: WalFlavor, shared: Arc>) -> Self { - Self { inner, flavor, shared } + Self { + inner, + flavor, + shared, + } } } @@ -150,8 +154,10 @@ impl ReplicationLog for LibsqlReplicationService { let shared = self.registry.get_async(&namespace.into()).await.unwrap(); let req = req.into_inner(); // TODO: replicator should only accecpt NonZero - let replicator = - libsql_wal::replication::replicator::Replicator::new(shared.clone(), req.next_offset.max(1)); + let replicator = libsql_wal::replication::replicator::Replicator::new( + shared.clone(), + req.next_offset.max(1), + ); let flavor = req.wal_flavor(); let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared); diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 1862d309ef..5a9349dc76 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -28,7 +28,7 @@ impl Injector { capacity: buffer_capacity, tx: None, max_tx_frame_no: 0, - previous_durable_frame_no: 0, + previous_durable_frame_no: 0, }) } @@ -84,25 +84,29 @@ impl Injector { if commit_data.is_some() { self.max_tx_frame_no = 0; } - let buffer = current - .inject_frames(buffer, commit_data, tx) - .await?; + let buffer = current.inject_frames(buffer, commit_data, tx).await?; self.buffer = buffer; self.buffer.clear(); } if size_after.is_some() { let mut tx = self.tx.take().unwrap(); - self.wal.new_frame_notifier.send_replace(last_committed_frame_no); + self.wal + .new_frame_notifier + .send_replace(last_committed_frame_no); // the strategy to swap the current log is to do it on change of durable boundary, // when we have caught up with the current durable frame_no - if self.current_durable() != self.previous_durable_frame_no && self.current_durable() >= self.max_tx_frame_no { + if self.current_durable() != self.previous_durable_frame_no + && self.current_durable() >= self.max_tx_frame_no + { let wal = self.wal.clone(); // FIXME: tokio dependency here is annoying, we need an async version of swap_current. tokio::task::spawn_blocking(move || { tx.commit(); wal.swap_current(&tx) - }).await.unwrap()? + }) + .await + .unwrap()? } } } diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 4f5a7d3924..9b3b4ce5ac 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -23,7 +23,7 @@ use crate::io::file::FileExt; use crate::io::Inspect; use crate::segment::{checked_frame_offset, SegmentFlags}; use crate::segment::{frame_offset, page_offset, sealed::SealedSegment}; -use crate::transaction::{Transaction, TxGuardShared, TxGuardOwned}; +use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared}; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; use super::list::SegmentList; diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index b14cf7830f..b50622d3ef 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -109,7 +109,10 @@ where // readers pointing to them while let Some(segment) = &*current { // skip any segment more recent than until_frame_no - tracing::debug!(last_committed = segment.last_committed(), until = until_frame_no); + tracing::debug!( + last_committed = segment.last_committed(), + until = until_frame_no + ); if segment.last_committed() <= until_frame_no { if !segment.is_checkpointable() { segs.clear(); diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index daa0a51555..d25b807dc0 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -99,7 +99,7 @@ impl SharedWal { pub fn durable_frame_no(&self) -> u64 { *self.durable_frame_no.lock() } - + #[tracing::instrument(skip_all)] pub fn begin_read(&self, conn_id: u64) -> ReadTransaction { // FIXME: this is not enough to just increment the counter, we must make sure that the segment diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index 1545b3afc5..93fb3fe16d 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -189,10 +189,10 @@ impl DerefMut for TxGuardOwned { } } -pub trait TxGuard: Deref> + DerefMut + Send + Sync { } +pub trait TxGuard: Deref> + DerefMut + Send + Sync {} -impl<'a, F: Send + Sync> TxGuard for TxGuardShared<'a, F> { } -impl TxGuard for TxGuardOwned { } +impl<'a, F: Send + Sync> TxGuard for TxGuardShared<'a, F> {} +impl TxGuard for TxGuardOwned {} pub struct TxGuardShared<'a, F> { _lock: async_lock::MutexGuardArc>, From fb082ac1429aa60658901410e1a6a535bc0a4030 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 4 Sep 2024 11:07:15 +0200 Subject: [PATCH 52/52] Apply suggestions from code review --- libsql-server/src/main.rs | 2 +- libsql-wal/src/checkpointer.rs | 2 +- libsql-wal/src/registry.rs | 1 - libsql-wal/src/segment/sealed.rs | 4 ++-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index 5fd69cd133..7295bf9d16 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -298,7 +298,7 @@ struct Cli { long, env = "LIBSQL_SYNC_CONCCURENCY", requires = "sync_from_storage", - default_value = "8", + default_value = "8" )] sync_conccurency: usize, } diff --git a/libsql-wal/src/checkpointer.rs b/libsql-wal/src/checkpointer.rs index 176140c2cc..98682481eb 100644 --- a/libsql-wal/src/checkpointer.rs +++ b/libsql-wal/src/checkpointer.rs @@ -158,7 +158,7 @@ where self.errors = 0; } } - Some(Err(e)) => panic!("checkoint task panicked: {e}"), + Some(Err(e)) => panic!("checkpoint task panicked: {e}"), None => unreachable!("got None, but join set is not empty") } } diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index e922903b3f..628def2cdf 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -543,7 +543,6 @@ where Some(Ok(mut frame)) => { if stream.peek().await.is_none() { drop(stream); - frame.header_mut().frame_no(); frame.header_mut().set_size_after(seen.len() as _); injector.insert_frame(frame).await?; break; diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index cc6b9d9342..f5f3b244a8 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -110,8 +110,8 @@ where while let Some((page_no_bytes, offset)) = pages.next() { let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await; ret?; - // transaction boundaries in a segment are completely erased. The responsibility in on - // the user of the segment to place the transaction boundary such that all frames from + // transaction boundaries in a segment are completely erased. The responsibility is on + // the consumer of the segment to place the transaction boundary such that all frames from // the segment are applied within the same transaction. b.get_mut().header_mut().set_size_after(0); hasher.update(&b.get_ref().as_bytes());