Skip to content

Commit

Permalink
Merge pull request #1700 from tursodatabase/bottomless-cli-impr
Browse files Browse the repository at this point in the history
implement create and copy bottomless-cli commands
  • Loading branch information
sivukhin authored Aug 27, 2024
2 parents 7e0f3d7 + 84c2fa1 commit f34e52a
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 78 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ RUN if [ "$ENABLE_FEATURES" == "" ]; then \
else \
cargo build -p libsql-server --features "$ENABLE_FEATURES" --release ; \
fi
RUN cargo build -p bottomless-cli --release

# official gosu install instruction (https://github.com/tianon/gosu/blob/master/INSTALL.md)
FROM debian:bullseye-slim as gosu
Expand Down Expand Up @@ -86,6 +87,7 @@ COPY docker-wrapper.sh /usr/local/bin
COPY --from=gosu /usr/local/bin/gosu /usr/local/bin/gosu
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=builder /target/release/sqld /bin/sqld
COPY --from=builder /target/release/bottomless-cli /bin/bottomless-cli

USER root

Expand Down
1 change: 1 addition & 0 deletions bottomless-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bottomless = { version = "0", path = "../bottomless" }
bytes = "1"
chrono = "0.4.23"
clap = { version = "4.0.29", features = ["derive"] }
libsql-sys = { path = "../libsql-sys" }
tokio = { version = "1.23.0", features = ["macros", "rt", "rt-multi-thread"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
Expand Down
203 changes: 157 additions & 46 deletions bottomless-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use aws_sdk_s3::Client;
use bytes::Bytes;
use chrono::NaiveDateTime;
use clap::{Parser, Subcommand};
use libsql_sys::{connection::NO_AUTOCHECKPOINT, wal::Sqlite3WalManager};
use rusqlite::params;
use std::path::PathBuf;

mod replicator_extras;
Expand All @@ -25,10 +27,24 @@ struct Cli {
namespace: Option<String>,
#[clap(long)]
encryption_key: Option<Bytes>,
#[clap(long)]
db_name: Option<String>,
}

#[derive(Debug, Subcommand)]
enum Commands {
#[clap(about = "Copy bottomless generation locally")]
Copy {
#[clap(long, short, long_help = "Generation to copy (latest by default)")]
generation: Option<uuid::Uuid>,
#[clap(long, short, long_help = "Target local directory")]
to_dir: String,
},
#[clap(about = "Create new generation from database")]
Create {
#[clap(long, short, long_help = "Path to the source database file")]
source_db_path: String,
},
#[clap(about = "List available generations")]
Ls {
#[clap(long, short, long_help = "List details about single generation")]
Expand Down Expand Up @@ -113,6 +129,34 @@ enum Commands {
},
}

async fn detect_database(options: &Cli, namespace: &str) -> Result<(String, String)> {
let database = match options.database.clone() {
Some(db) => db,
None => {
let client = Client::from_conf({
let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
if let Some(endpoint) = options.endpoint.clone() {
loader = loader.endpoint_url(endpoint);
}
aws_sdk_s3::config::Builder::from(&loader.load().await)
.force_path_style(true)
.build()
});
let bucket = options.bucket.as_deref().unwrap_or("bottomless");
match detect_db(&client, bucket, namespace).await {
Some(db) => db,
None => {
return Err(anyhow!("Could not autodetect the database. Please pass it explicitly with -d option"));
}
}
}
};
let database_dir = database + "/dbs/" + namespace.strip_prefix("ns-").unwrap();
let database = database_dir.clone() + "/data";
tracing::info!("Database: '{}' (namespace: {})", database, namespace);
return Ok((database, database_dir));
}

async fn run() -> Result<()> {
tracing_subscriber::fmt::init();
let mut options = Cli::parse();
Expand Down Expand Up @@ -189,56 +233,109 @@ async fn run() -> Result<()> {
std::str::from_utf8(encryption_key)?,
);
}
let namespace = options.namespace.as_deref().unwrap_or("ns-default");
std::env::set_var("LIBSQL_BOTTOMLESS_DATABASE_ID", namespace);
let database = match options.database.clone() {
Some(db) => db,
None => {
let client = Client::from_conf({
let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
if let Some(endpoint) = options.endpoint.clone() {
loader = loader.endpoint_url(endpoint);
}
aws_sdk_s3::config::Builder::from(&loader.load().await)
.force_path_style(true)
.build()
});
let bucket = options.bucket.as_deref().unwrap_or("bottomless");
match detect_db(&client, bucket, namespace).await {
Some(db) => db,
None => {
println!("Could not autodetect the database. Please pass it explicitly with -d option");
return Ok(());
}
}
let namespace_init = std::env::var("LIBSQL_BOTTOMLESS_DATABASE_ID").unwrap_or(String::new());
if options.db_name.is_some() && options.namespace.is_some() {
return Err(anyhow!(
"only one of the arguments --db-name or --namespace is expected to be set"
));
}
if let Some(ref db_name) = options.db_name {
if namespace_init != "" {
std::env::set_var(
"LIBSQL_BOTTOMLESS_DATABASE_ID",
format!("ns-{}:{}", &namespace_init, db_name),
);
} else {
return Err(anyhow!(
"db_name can be set only if LIBSQL_BOTTOMLESS_DATABASE_ID env var has namespace ID"
));
}
};
let database_dir = database + "/dbs/" + namespace.strip_prefix("ns-").unwrap();
let database = database_dir.clone() + "/data";
tracing::info!("Database: '{}' (namespace: {})", database, namespace);

let mut client = Replicator::new(database.clone()).await?;

} else {
let namespace = options.namespace.as_deref().unwrap_or("ns-default");
std::env::set_var("LIBSQL_BOTTOMLESS_DATABASE_ID", namespace);
}
let namespace = std::env::var("LIBSQL_BOTTOMLESS_DATABASE_ID").unwrap();
if namespace_init != namespace {
tracing::info!(
"LIBSQL_BOTTOMLESS_DATABASE_ID env var were updated: '{}' -> '{}'",
namespace_init,
namespace
);
}
match options.command {
Commands::Create { ref source_db_path } => {
let mut client =
Replicator::new(detect_database(&options, &namespace).await?.0).await?;

let db_path = PathBuf::from(client.db_path.clone());
let db_dir = db_path.parent().unwrap();
if db_dir.exists() {
return Err(anyhow!("directory for fresh generation must be empty"));
}
if options.namespace.is_none() {
return Err(anyhow!("namespace must be specified explicitly"));
}
std::fs::create_dir_all(db_dir)?;
tracing::info!(
"created temporary directory for fresh generation: {}",
db_dir.to_str().unwrap()
);
let options = bottomless::replicator::Options::from_env()?;
if options.encryption_config.is_some() {
return Err(anyhow!("creation from encrypted DB is not supported"));
}
let connection = libsql_sys::Connection::open(
format!("file:{}?mode=ro", source_db_path),
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_URI
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
Sqlite3WalManager::new(),
NO_AUTOCHECKPOINT,
None,
)?;
tracing::info!(
"read to VACUUM source database file {} from read-only connection to the DB {}",
&source_db_path,
&client.db_path
);
let _ = connection.execute("VACUUM INTO ?", params![&client.db_path])?;
let _ = client.new_generation().await;
tracing::info!("set generation {} for replicator", client.generation()?);
client.snapshot_main_db_file(true).await?;
client.wait_until_snapshotted().await?;
println!("snapshot uploaded for generation: {}", client.generation()?);
return Ok(());
}
Commands::Copy { generation, to_dir } => {
let temp = std::env::temp_dir().join("bottomless-copy-temp-dir");
let mut client = Replicator::new(temp.display().to_string()).await?;
client.copy(generation, to_dir).await?;
}
Commands::Ls {
generation,
limit,
older_than,
newer_than,
verbose,
} => match generation {
Some(gen) => client.list_generation(gen).await?,
None => {
client
.list_generations(limit, older_than, newer_than, verbose)
.await?
} => {
let temp = std::env::temp_dir().join("bottomless-ls-temp-dir");
let client = Replicator::new(temp.display().to_string()).await?;
match generation {
Some(gen) => client.list_generation(gen).await?,
None => {
client
.list_generations(limit, older_than, newer_than, verbose)
.await?
}
}
},
}
Commands::Restore {
generation,
utc_time,
..
} => {
let (database, database_dir) = detect_database(&options, &namespace).await?;
let mut client = Replicator::new(database.clone()).await?;
tokio::fs::create_dir_all(&database_dir).await?;
client.restore(generation, utc_time).await?;
let db_path = PathBuf::from(&database);
Expand All @@ -252,9 +349,15 @@ async fn run() -> Result<()> {
generation,
utc_time,
} => {
let temp = std::env::temp_dir().join("bottomless-verification-do-not-touch");
let temp: PathBuf = std::env::temp_dir().join("bottomless-verify-temp-dir");
let mut client = Replicator::new(temp.display().to_string()).await?;
let _ = tokio::fs::remove_file(&temp).await;
tracing::info!(
"ready to restore DB from generation '{}'",
&generation
.map(|x| x.to_string())
.unwrap_or(String::from(""))
);
client.restore(generation, utc_time).await?;
let size = tokio::fs::metadata(&temp).await?.len();
println!("Snapshot size: {size}");
Expand All @@ -270,15 +373,23 @@ async fn run() -> Result<()> {
generation,
older_than,
verbose,
} => match (generation, older_than) {
(None, Some(older_than)) => client.remove_many(older_than, verbose).await?,
(Some(generation), None) => client.remove(generation, verbose).await?,
(Some(_), Some(_)) => unreachable!(),
(None, None) => println!(
"rm command cannot be run without parameters; see -h or --help for details"
),
},
} => {
let (database, _) = detect_database(&options, &namespace).await?;
let client = Replicator::new(database.clone()).await?;

match (generation, older_than) {
(None, Some(older_than)) => client.remove_many(older_than, verbose).await?,
(Some(generation), None) => client.remove(generation, verbose).await?,
(Some(_), Some(_)) => unreachable!(),
(None, None) => println!(
"rm command cannot be run without parameters; see -h or --help for details"
),
}
}
Commands::Snapshot { generation } => {
let (database, database_dir) = detect_database(&options, &namespace).await?;
let mut client = Replicator::new(database.clone()).await?;

tokio::fs::create_dir_all(&database_dir).await?;
let generation = if let Some(gen) = generation {
gen
Expand Down
Loading

0 comments on commit f34e52a

Please sign in to comment.