diff --git a/Cargo.lock b/Cargo.lock index 201ee78484..0d143018e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -973,6 +973,7 @@ dependencies = [ "chrono", "clap 4.5.9", "libsql-rusqlite", + "libsql-sys", "tokio", "tracing", "tracing-subscriber", diff --git a/Dockerfile b/Dockerfile index 6146dbb17d..da0cd751ee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,6 +36,7 @@ RUN if [ "$ENABLE_FEATURES" == "" ]; then \ else \ cargo build -p libsql-server --features "$ENABLE_FEATURES" --release ; \ fi +RUN cargo build -p bottomless-cli --release # official gosu install instruction (https://github.com/tianon/gosu/blob/master/INSTALL.md) FROM debian:bullseye-slim as gosu @@ -86,6 +87,7 @@ COPY docker-wrapper.sh /usr/local/bin COPY --from=gosu /usr/local/bin/gosu /usr/local/bin/gosu COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt COPY --from=builder /target/release/sqld /bin/sqld +COPY --from=builder /target/release/bottomless-cli /bin/bottomless-cli USER root diff --git a/bottomless-cli/Cargo.toml b/bottomless-cli/Cargo.toml index 1ab99f1f76..264611b748 100644 --- a/bottomless-cli/Cargo.toml +++ b/bottomless-cli/Cargo.toml @@ -19,6 +19,7 @@ bottomless = { version = "0", path = "../bottomless" } bytes = "1" chrono = "0.4.23" clap = { version = "4.0.29", features = ["derive"] } +libsql-sys = { path = "../libsql-sys" } tokio = { version = "1.23.0", features = ["macros", "rt", "rt-multi-thread"] } tracing = "0.1.37" tracing-subscriber = "0.3.16" diff --git a/bottomless-cli/src/main.rs b/bottomless-cli/src/main.rs index af3184ccaf..d81b3ac39c 100644 --- a/bottomless-cli/src/main.rs +++ b/bottomless-cli/src/main.rs @@ -1,8 +1,10 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use aws_sdk_s3::Client; use bytes::Bytes; use chrono::NaiveDateTime; use clap::{Parser, Subcommand}; +use libsql_sys::{connection::NO_AUTOCHECKPOINT, wal::Sqlite3WalManager}; +use rusqlite::params; use std::path::PathBuf; mod replicator_extras; @@ -25,10 +27,24 @@ struct Cli { namespace: Option, #[clap(long)] encryption_key: Option, + #[clap(long)] + db_name: Option, } #[derive(Debug, Subcommand)] enum Commands { + #[clap(about = "Copy bottomless generation locally")] + Copy { + #[clap(long, short, long_help = "Generation to copy (latest by default)")] + generation: Option, + #[clap(long, short, long_help = "Target local directory")] + to_dir: String, + }, + #[clap(about = "Create new generation from database")] + Create { + #[clap(long, short, long_help = "Path to the source database file")] + source_db_path: String, + }, #[clap(about = "List available generations")] Ls { #[clap(long, short, long_help = "List details about single generation")] @@ -113,6 +129,34 @@ enum Commands { }, } +async fn detect_database(options: &Cli, namespace: &str) -> Result<(String, String)> { + let database = match options.database.clone() { + Some(db) => db, + None => { + let client = Client::from_conf({ + let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest()); + if let Some(endpoint) = options.endpoint.clone() { + loader = loader.endpoint_url(endpoint); + } + aws_sdk_s3::config::Builder::from(&loader.load().await) + .force_path_style(true) + .build() + }); + let bucket = options.bucket.as_deref().unwrap_or("bottomless"); + match detect_db(&client, bucket, namespace).await { + Some(db) => db, + None => { + return Err(anyhow!("Could not autodetect the database. Please pass it explicitly with -d option")); + } + } + } + }; + let database_dir = database + "/dbs/" + namespace.strip_prefix("ns-").unwrap(); + let database = database_dir.clone() + "/data"; + tracing::info!("Database: '{}' (namespace: {})", database, namespace); + return Ok((database, database_dir)); +} + async fn run() -> Result<()> { tracing_subscriber::fmt::init(); let mut options = Cli::parse(); @@ -189,56 +233,109 @@ async fn run() -> Result<()> { std::str::from_utf8(encryption_key)?, ); } - let namespace = options.namespace.as_deref().unwrap_or("ns-default"); - std::env::set_var("LIBSQL_BOTTOMLESS_DATABASE_ID", namespace); - let database = match options.database.clone() { - Some(db) => db, - None => { - let client = Client::from_conf({ - let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest()); - if let Some(endpoint) = options.endpoint.clone() { - loader = loader.endpoint_url(endpoint); - } - aws_sdk_s3::config::Builder::from(&loader.load().await) - .force_path_style(true) - .build() - }); - let bucket = options.bucket.as_deref().unwrap_or("bottomless"); - match detect_db(&client, bucket, namespace).await { - Some(db) => db, - None => { - println!("Could not autodetect the database. Please pass it explicitly with -d option"); - return Ok(()); - } - } + let namespace_init = std::env::var("LIBSQL_BOTTOMLESS_DATABASE_ID").unwrap_or(String::new()); + if options.db_name.is_some() && options.namespace.is_some() { + return Err(anyhow!( + "only one of the arguments --db-name or --namespace is expected to be set" + )); + } + if let Some(ref db_name) = options.db_name { + if namespace_init != "" { + std::env::set_var( + "LIBSQL_BOTTOMLESS_DATABASE_ID", + format!("ns-{}:{}", &namespace_init, db_name), + ); + } else { + return Err(anyhow!( + "db_name can be set only if LIBSQL_BOTTOMLESS_DATABASE_ID env var has namespace ID" + )); } - }; - let database_dir = database + "/dbs/" + namespace.strip_prefix("ns-").unwrap(); - let database = database_dir.clone() + "/data"; - tracing::info!("Database: '{}' (namespace: {})", database, namespace); - - let mut client = Replicator::new(database.clone()).await?; - + } else { + let namespace = options.namespace.as_deref().unwrap_or("ns-default"); + std::env::set_var("LIBSQL_BOTTOMLESS_DATABASE_ID", namespace); + } + let namespace = std::env::var("LIBSQL_BOTTOMLESS_DATABASE_ID").unwrap(); + if namespace_init != namespace { + tracing::info!( + "LIBSQL_BOTTOMLESS_DATABASE_ID env var were updated: '{}' -> '{}'", + namespace_init, + namespace + ); + } match options.command { + Commands::Create { ref source_db_path } => { + let mut client = + Replicator::new(detect_database(&options, &namespace).await?.0).await?; + + let db_path = PathBuf::from(client.db_path.clone()); + let db_dir = db_path.parent().unwrap(); + if db_dir.exists() { + return Err(anyhow!("directory for fresh generation must be empty")); + } + if options.namespace.is_none() { + return Err(anyhow!("namespace must be specified explicitly")); + } + std::fs::create_dir_all(db_dir)?; + tracing::info!( + "created temporary directory for fresh generation: {}", + db_dir.to_str().unwrap() + ); + let options = bottomless::replicator::Options::from_env()?; + if options.encryption_config.is_some() { + return Err(anyhow!("creation from encrypted DB is not supported")); + } + let connection = libsql_sys::Connection::open( + format!("file:{}?mode=ro", source_db_path), + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY + | rusqlite::OpenFlags::SQLITE_OPEN_URI + | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, + Sqlite3WalManager::new(), + NO_AUTOCHECKPOINT, + None, + )?; + tracing::info!( + "read to VACUUM source database file {} from read-only connection to the DB {}", + &source_db_path, + &client.db_path + ); + let _ = connection.execute("VACUUM INTO ?", params![&client.db_path])?; + let _ = client.new_generation().await; + tracing::info!("set generation {} for replicator", client.generation()?); + client.snapshot_main_db_file(true).await?; + client.wait_until_snapshotted().await?; + println!("snapshot uploaded for generation: {}", client.generation()?); + return Ok(()); + } + Commands::Copy { generation, to_dir } => { + let temp = std::env::temp_dir().join("bottomless-copy-temp-dir"); + let mut client = Replicator::new(temp.display().to_string()).await?; + client.copy(generation, to_dir).await?; + } Commands::Ls { generation, limit, older_than, newer_than, verbose, - } => match generation { - Some(gen) => client.list_generation(gen).await?, - None => { - client - .list_generations(limit, older_than, newer_than, verbose) - .await? + } => { + let temp = std::env::temp_dir().join("bottomless-ls-temp-dir"); + let client = Replicator::new(temp.display().to_string()).await?; + match generation { + Some(gen) => client.list_generation(gen).await?, + None => { + client + .list_generations(limit, older_than, newer_than, verbose) + .await? + } } - }, + } Commands::Restore { generation, utc_time, .. } => { + let (database, database_dir) = detect_database(&options, &namespace).await?; + let mut client = Replicator::new(database.clone()).await?; tokio::fs::create_dir_all(&database_dir).await?; client.restore(generation, utc_time).await?; let db_path = PathBuf::from(&database); @@ -252,9 +349,15 @@ async fn run() -> Result<()> { generation, utc_time, } => { - let temp = std::env::temp_dir().join("bottomless-verification-do-not-touch"); + let temp: PathBuf = std::env::temp_dir().join("bottomless-verify-temp-dir"); let mut client = Replicator::new(temp.display().to_string()).await?; let _ = tokio::fs::remove_file(&temp).await; + tracing::info!( + "ready to restore DB from generation '{}'", + &generation + .map(|x| x.to_string()) + .unwrap_or(String::from("")) + ); client.restore(generation, utc_time).await?; let size = tokio::fs::metadata(&temp).await?.len(); println!("Snapshot size: {size}"); @@ -270,15 +373,23 @@ async fn run() -> Result<()> { generation, older_than, verbose, - } => match (generation, older_than) { - (None, Some(older_than)) => client.remove_many(older_than, verbose).await?, - (Some(generation), None) => client.remove(generation, verbose).await?, - (Some(_), Some(_)) => unreachable!(), - (None, None) => println!( - "rm command cannot be run without parameters; see -h or --help for details" - ), - }, + } => { + let (database, _) = detect_database(&options, &namespace).await?; + let client = Replicator::new(database.clone()).await?; + + match (generation, older_than) { + (None, Some(older_than)) => client.remove_many(older_than, verbose).await?, + (Some(generation), None) => client.remove(generation, verbose).await?, + (Some(_), Some(_)) => unreachable!(), + (None, None) => println!( + "rm command cannot be run without parameters; see -h or --help for details" + ), + } + } Commands::Snapshot { generation } => { + let (database, database_dir) = detect_database(&options, &namespace).await?; + let mut client = Replicator::new(database.clone()).await?; + tokio::fs::create_dir_all(&database_dir).await?; let generation = if let Some(gen) = generation { gen diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index fead5f02df..f8e362764a 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, Semaphore}; use tokio::task::JoinHandle; use tokio::task::JoinSet; use tokio::time::Duration; @@ -70,7 +70,7 @@ pub struct Replicator { use_compression: CompressionKind, encryption_config: Option, max_frames_per_batch: usize, - s3_upload_max_parallelism: usize, + s3_max_parallelism: usize, join_set: JoinSet<()>, upload_progress: Arc>, last_uploaded_frame_no: Receiver, @@ -117,7 +117,7 @@ pub struct Options { /// checkpoint never commits. pub max_batch_interval: Duration, /// Maximum number of S3 file upload requests that may happen in parallel. - pub s3_upload_max_parallelism: usize, + pub s3_max_parallelism: usize, /// Max number of retries for S3 operations pub s3_max_retries: u32, /// Skip snapshot upload per checkpoint. @@ -198,7 +198,7 @@ impl Options { let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok(); let max_frames_per_batch = env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::()?; - let s3_upload_max_parallelism = + let s3_max_parallelism = env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::()?; let use_compression = CompressionKind::parse(&env_var_or("LIBSQL_BOTTOMLESS_COMPRESSION", "zstd")) @@ -246,7 +246,7 @@ impl Options { encryption_config, max_batch_interval, max_frames_per_batch, - s3_upload_max_parallelism, + s3_max_parallelism, aws_endpoint, access_key_id, secret_access_key, @@ -413,7 +413,7 @@ impl Replicator { let _s3_upload = { let client = client.clone(); let bucket = options.bucket_name.clone(); - let max_parallelism = options.s3_upload_max_parallelism; + let max_parallelism = options.s3_max_parallelism; let upload_progress = upload_progress.clone(); let db_name = db_name.clone(); let shutdown_watch = Arc::new(shutdown_watch); @@ -493,7 +493,7 @@ impl Replicator { use_compression: options.use_compression, encryption_config: options.encryption_config, max_frames_per_batch: options.max_frames_per_batch, - s3_upload_max_parallelism: options.s3_upload_max_parallelism, + s3_max_parallelism: options.s3_max_parallelism, skip_snapshot: options.skip_snapshot, join_set, upload_progress, @@ -1279,17 +1279,13 @@ impl Replicator { timestamp: Option, ) -> Result<(RestoreAction, bool)> { tracing::debug!("restoring from"); - if let Some(tombstone) = self.get_tombstone().await? { - if let Some(timestamp) = Self::generation_to_timestamp(&generation) { - if tombstone.and_utc().timestamp() as u64 >= timestamp.to_unix().0 { - bail!( - "Couldn't restore from generation {}. Database '{}' has been tombstoned at {}.", - generation, - self.db_name, - tombstone - ); - } - } + if let Some(tombstoned_at) = self.is_tombstoned(generation).await? { + bail!( + "Couldn't restore from generation {}. Database '{}' has been tombstoned at {}.", + generation, + self.db_name, + tombstoned_at + ); } let start_ts = Instant::now(); @@ -1685,6 +1681,94 @@ impl Replicator { Ok(()) } + pub async fn copy(&mut self, generation: Option, to_dir: String) -> Result<()> { + let generation = self + .choose_generation(generation, None) + .await + .ok_or(anyhow!("generation not found"))?; + + if let Some(tombstoned_at) = self.is_tombstoned(generation).await? { + tracing::warn!("generation was tombstoned at {tombstoned_at}"); + } + std::fs::create_dir_all(PathBuf::from(&to_dir))?; + + let prefix = format!("{}-{}/", self.db_name, generation); + let mut marker: Option = None; + tracing::info!( + "ready to copy S3 content from directory {} to local directory {} (parallelism: {})", + &prefix, + &to_dir, + self.s3_max_parallelism + ); + loop { + let mut list_request = self.list_objects().prefix(&prefix); + if let Some(marker) = marker.take() { + list_request = list_request.marker(marker); + } + let semaphore = Arc::new(Semaphore::new(self.s3_max_parallelism)); + let mut group = JoinSet::new(); + let list_response = list_request.send().await?; + for entry in list_response.contents() { + let key = String::from(entry.key().unwrap()); + marker = Some(key.clone()); + + let request = self + .client + .get_object() + .bucket(&self.bucket) + .key(key.clone()); + let to_dir = to_dir.clone(); + let entry_size = entry.size().unwrap_or(0); + let semaphore = semaphore.clone(); + group.spawn(async move { + let acquired = semaphore.acquire().await.unwrap(); + if let Ok(response) = request.send().await { + tracing::debug!( + "start copy of entry {} (size {} bytes)", + &key, + entry_size, + ); + let entry_name = key.split("/").last().unwrap(); + let mut entry_path = PathBuf::from(&to_dir); + entry_path.push(entry_name); + + let mut entry_file = OpenOptions::new() + .create(true) + .write(true) + .read(true) + .truncate(true) + .open(entry_path) + .await + .unwrap(); + let mut body_reader = response.body.into_async_read(); + tokio::io::copy(&mut body_reader, &mut entry_file).await.unwrap(); + tracing::debug!("finish copy of entry {}", &key); + } + drop(acquired); + }); + } + while let Some(_) = group.join_next().await {} + if !marker.is_some() { + break; + } + } + Ok(()) + } + + async fn choose_generation( + &mut self, + generation: Option, + timestamp: Option, + ) -> Option { + match generation { + Some(gen) => Some(gen), + None => match self.latest_generation_before(timestamp.as_ref()).await { + Some(gen) => Some(gen), + None => None, + }, + } + } + /// Restores the database state from newest remote generation /// On success, returns the RestoreAction, and whether the database was recovered from backup. pub async fn restore( @@ -1693,15 +1777,12 @@ impl Replicator { timestamp: Option, ) -> Result<(RestoreAction, bool)> { tracing::debug!("restoring with {generation:?} at {timestamp:?}"); - let generation = match generation { - Some(gen) => gen, - None => match self.latest_generation_before(timestamp.as_ref()).await { - Some(gen) => gen, - None => { - tracing::debug!("No generation found, nothing to restore"); - return Ok((RestoreAction::SnapshotMainDbFile, false)); - } - }, + let generation = match self.choose_generation(generation, timestamp).await { + Some(generation) => generation, + None => { + tracing::debug!("No generation found, nothing to restore"); + return Ok((RestoreAction::SnapshotMainDbFile, false)); + } }; let (action, recovered) = self.restore_from(generation, timestamp).await?; @@ -1751,7 +1832,7 @@ impl Replicator { let dir = format!("{}/{}-{}", self.bucket, self.db_name, generation); if tokio::fs::try_exists(&dir).await? { let mut files = tokio::fs::read_dir(&dir).await?; - let sem = Arc::new(tokio::sync::Semaphore::new(self.s3_upload_max_parallelism)); + let sem = Arc::new(tokio::sync::Semaphore::new(self.s3_max_parallelism)); while let Some(file) = files.next_entry().await? { let fpath = file.path(); if let Some(key) = Self::fpath_to_key(&fpath, &prefix) { @@ -1780,9 +1861,7 @@ impl Replicator { } } // wait for all started upload tasks to finish - let _ = sem - .acquire_many(self.s3_upload_max_parallelism as u32) - .await?; + let _ = sem.acquire_many(self.s3_max_parallelism as u32).await?; if let Err(e) = tokio::fs::remove_dir(&dir).await { tracing::warn!("Couldn't remove backed up directory {}: {}", dir, e); } @@ -1876,6 +1955,17 @@ impl Replicator { Ok(delete_task) } + pub async fn is_tombstoned(&self, generation: Uuid) -> Result> { + if let Some(tombstone) = self.get_tombstone().await? { + if let Some(timestamp) = Self::generation_to_timestamp(&generation) { + if (tombstone.and_utc().timestamp() as u64) >= timestamp.to_unix().0 { + return Ok(Some(tombstone)); + }; + } + } + Ok(None) + } + /// Checks if current replicator database has been marked as deleted. pub async fn get_tombstone(&self) -> Result> { let key = format!("{}.tombstone", self.db_name); diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 3b4ea97730..d94ffaa7aa 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -125,7 +125,7 @@ pub async fn metastore_connection_maker( bucket_name: config.bucket_name, max_frames_per_batch: 10_000, max_batch_interval: config.backup_interval, - s3_upload_max_parallelism: 32, + s3_max_parallelism: 32, s3_max_retries: 10, skip_snapshot: false, };