diff --git a/.gitignore b/.gitignore index 28e75a82..f13c5bf1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ /.DS_Store /.idea .history -**/*_workspace/** \ No newline at end of file +**/*_workspace/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index cb9ba6f0..12d9b5a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2828,6 +2828,7 @@ dependencies = [ "optd-gungnir", "prettytable-rs", "regex", + "serde_json", "shlex", "statistical", "tokio", diff --git a/optd-perftest/Cargo.toml b/optd-perftest/Cargo.toml index 7c7511a0..c677430d 100644 --- a/optd-perftest/Cargo.toml +++ b/optd-perftest/Cargo.toml @@ -41,6 +41,7 @@ tokio-util = "0.7" futures-util = "0.3" statistical = "1.0" prettytable-rs = "0.10" +serde_json = "1.0" [dev_dependencies] assert_cmd = "2.0" diff --git a/optd-perftest/src/benchmark.rs b/optd-perftest/src/benchmark.rs index 086d76f6..642aa01c 100644 --- a/optd-perftest/src/benchmark.rs +++ b/optd-perftest/src/benchmark.rs @@ -26,7 +26,7 @@ impl Benchmark { let dbname = match self { Self::Test => String::from("test"), Self::Tpch(tpch_config) => { - format!("tpch_sf{}_sd{}", tpch_config.scale_factor, tpch_config.seed) + format!("tpch_sf{}", tpch_config.scale_factor) } }; // since Postgres names cannot contain periods diff --git a/optd-perftest/src/cardtest.rs b/optd-perftest/src/cardtest.rs index f3e8ca6e..276e26e0 100644 --- a/optd-perftest/src/cardtest.rs +++ b/optd-perftest/src/cardtest.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use std::path::Path; -use crate::postgres_db::PostgresDb; -use crate::{benchmark::Benchmark, datafusion_db::DatafusionDb, tpch::TpchConfig}; +use crate::postgres_dbms::{PostgresDBMS, POSTGRES_DBMS_NAME}; +use crate::{benchmark::Benchmark, datafusion_dbms::DatafusionDBMS, tpch::TpchConfig}; use anyhow::{self}; use async_trait::async_trait; @@ -32,14 +32,21 @@ impl CardtestRunner { ) -> anyhow::Result>> { let mut qerrors_alldbs = HashMap::new(); + // postgres runs faster and is less buggy so we use their true cardinalities + // in the future, it's probably a good idea to get the truecards of datafusion to ensure that they match + let pg_dbms = self + .dbmss + .iter_mut() + .find(|dbms| dbms.get_name() == POSTGRES_DBMS_NAME) + .unwrap(); + let pg_truecards = pg_dbms.eval_benchmark_truecards(benchmark).await?; + for dbms in &mut self.dbmss { let estcards = dbms.eval_benchmark_estcards(benchmark).await?; - let truecards = dbms.eval_benchmark_truecards(benchmark).await?; - assert!(truecards.len() == estcards.len()); let qerrors = estcards .into_iter() - .zip(truecards.into_iter()) - .map(|(estcard, truecard)| CardtestRunner::calc_qerror(estcard, truecard)) + .zip(pg_truecards.iter()) + .map(|(estcard, truecard)| CardtestRunner::calc_qerror(estcard, *truecard)) .collect(); qerrors_alldbs.insert(String::from(dbms.get_name()), qerrors); } @@ -95,9 +102,9 @@ pub async fn cardtest + Clone>( pgpassword: &str, tpch_config: TpchConfig, ) -> anyhow::Result>> { - let pg_db = PostgresDb::new(workspace_dpath.clone(), pguser, pgpassword); - let df_db = DatafusionDb::new(workspace_dpath).await?; - let dbmss: Vec> = vec![Box::new(pg_db), Box::new(df_db)]; + let pg_dbms = PostgresDBMS::build(workspace_dpath.clone(), pguser, pgpassword)?; + let df_dbms = DatafusionDBMS::new(workspace_dpath).await?; + let dbmss: Vec> = vec![Box::new(pg_dbms), Box::new(df_dbms)]; let tpch_benchmark = Benchmark::Tpch(tpch_config.clone()); let mut cardtest_runner = CardtestRunner::new(dbmss).await?; diff --git a/optd-perftest/src/datafusion_db.rs b/optd-perftest/src/datafusion_dbms.rs similarity index 98% rename from optd-perftest/src/datafusion_db.rs rename to optd-perftest/src/datafusion_dbms.rs index eee19a38..25acc7d3 100644 --- a/optd-perftest/src/datafusion_db.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -29,13 +29,13 @@ use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner}; use optd_datafusion_repr::{cost::BaseTableStats, cost::PerTableStats, DatafusionOptimizer}; use regex::Regex; -pub struct DatafusionDb { +pub struct DatafusionDBMS { workspace_dpath: PathBuf, ctx: SessionContext, } #[async_trait] -impl CardtestRunnerDBMSHelper for DatafusionDb { +impl CardtestRunnerDBMSHelper for DatafusionDBMS { fn get_name(&self) -> &str { "DataFusion" } @@ -63,9 +63,9 @@ impl CardtestRunnerDBMSHelper for DatafusionDb { } } -impl DatafusionDb { +impl DatafusionDBMS { pub async fn new>(workspace_dpath: P) -> anyhow::Result { - Ok(DatafusionDb { + Ok(DatafusionDBMS { workspace_dpath: workspace_dpath.as_ref().to_path_buf(), ctx: Self::new_session_ctx(None).await?, }) @@ -319,4 +319,4 @@ impl DatafusionDb { } } -unsafe impl Send for DatafusionDb {} +unsafe impl Send for DatafusionDBMS {} diff --git a/optd-perftest/src/lib.rs b/optd-perftest/src/lib.rs index 96da0d56..af739db9 100644 --- a/optd-perftest/src/lib.rs +++ b/optd-perftest/src/lib.rs @@ -1,6 +1,7 @@ mod benchmark; pub mod cardtest; -mod datafusion_db; -mod postgres_db; +mod datafusion_dbms; +mod postgres_dbms; pub mod shell; pub mod tpch; +mod truecard_cache; diff --git a/optd-perftest/src/main.rs b/optd-perftest/src/main.rs index 536d1905..da22f6db 100644 --- a/optd-perftest/src/main.rs +++ b/optd-perftest/src/main.rs @@ -47,6 +47,11 @@ enum Commands { }, } +// q-errors are always >= 1.0 so two decimal points is enough +fn fmt_qerror(qerror: f64) -> String { + format!("{:.2}", qerror) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -73,15 +78,11 @@ async fn main() -> anyhow::Result<()> { }; let qerrors_alldbs = cardtest::cardtest(&workspace_dpath, &pguser, &pgpassword, tpch_config).await?; + println!(); println!(" Aggregate Q-Error Comparison"); let mut agg_qerror_table = Table::new(); agg_qerror_table.set_titles(prettytable::row![ - "DBMS", - "Median", - "# Infinite", - "Mean", - "Min", - "Max" + "DBMS", "Median", "# Inf", "Mean", "Min", "Max" ]); for (dbms, qerrors) in &qerrors_alldbs { if !qerrors.is_empty() { @@ -93,22 +94,22 @@ async fn main() -> anyhow::Result<()> { let ninf_qerrors = qerrors.len() - finite_qerrors.len(); let mean_qerror = finite_qerrors.iter().sum::() / finite_qerrors.len() as f64; - let min_qerror = finite_qerrors + let min_qerror = qerrors .iter() .min_by(|a, b| a.partial_cmp(b).unwrap()) .unwrap(); let median_qerror = statistical::median(qerrors); - let max_qerror = finite_qerrors + let max_qerror = qerrors .iter() .max_by(|a, b| a.partial_cmp(b).unwrap()) .unwrap(); agg_qerror_table.add_row(prettytable::row![ dbms, - median_qerror, + fmt_qerror(median_qerror), ninf_qerrors, - mean_qerror, - min_qerror, - max_qerror + fmt_qerror(mean_qerror), + fmt_qerror(*min_qerror), + fmt_qerror(*max_qerror), ]); } else { agg_qerror_table @@ -119,6 +120,7 @@ async fn main() -> anyhow::Result<()> { agg_qerror_table.printstd(); let mut per_query_qerror_table = Table::new(); + println!(); println!(" Per-Query Q-Error Comparison"); let title_cells = iter::once(Cell::new("Query #")) .chain(qerrors_alldbs.keys().map(|dbms| Cell::new(dbms))) @@ -129,7 +131,7 @@ async fn main() -> anyhow::Result<()> { row_cells.push(prettytable::cell!(query_id)); for qerrors in qerrors_alldbs.values() { let qerror = qerrors.get(i).unwrap(); - row_cells.push(prettytable::cell!(qerror)); + row_cells.push(prettytable::cell!(fmt_qerror(*qerror))); } per_query_qerror_table.add_row(Row::new(row_cells)); } diff --git a/optd-perftest/src/postgres_db.rs b/optd-perftest/src/postgres_dbms.rs similarity index 84% rename from optd-perftest/src/postgres_db.rs rename to optd-perftest/src/postgres_dbms.rs index a801018f..f9e58183 100644 --- a/optd-perftest/src/postgres_db.rs +++ b/optd-perftest/src/postgres_dbms.rs @@ -2,6 +2,7 @@ use crate::{ benchmark::Benchmark, cardtest::CardtestRunnerDBMSHelper, tpch::{TpchConfig, TpchKit}, + truecard_cache::DBMSTruecardCache, }; use async_trait::async_trait; use futures::Sink; @@ -17,13 +18,17 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio_postgres::{Client, NoTls, Row}; +/// The name of the Postgres DBMS (as opposed to the DataFusion DBMS for instance) +pub const POSTGRES_DBMS_NAME: &str = "Postgres"; + /// This dbname is assumed to always exist const DEFAULT_DBNAME: &str = "postgres"; -pub struct PostgresDb { +pub struct PostgresDBMS { workspace_dpath: PathBuf, pguser: String, pgpassword: String, + truecard_cache: DBMSTruecardCache, } /// Conventions I keep for methods of this class: @@ -31,13 +36,21 @@ pub struct PostgresDb { /// - For instance, this is why "createdb" is _not_ a function /// - Stop and start functions should be separate /// - Setup should be done in build() unless it requires more information (like benchmark) -impl PostgresDb { - pub fn new>(workspace_dpath: P, pguser: &str, pgpassword: &str) -> Self { - Self { - workspace_dpath: PathBuf::from(workspace_dpath.as_ref()), +impl PostgresDBMS { + pub fn build>( + workspace_dpath: P, + pguser: &str, + pgpassword: &str, + ) -> anyhow::Result { + let workspace_dpath = PathBuf::from(workspace_dpath.as_ref()); + let truecard_cache = DBMSTruecardCache::build(&workspace_dpath, POSTGRES_DBMS_NAME)?; + let pg_dbms = Self { + workspace_dpath, pguser: String::from(pguser), pgpassword: String::from(pgpassword), - } + truecard_cache, + }; + Ok(pg_dbms) } /// Create a connection to a Postgres database @@ -145,6 +158,11 @@ impl PostgresDb { Self::copy_from_stdin(client, tbl_fpath).await?; } + // create stats + // you need to do VACUUM FULL ANALYZE and not just ANALYZE to make sure the stats are created in a deterministic way + // this is standard practice for postgres benchmarking + client.query("VACUUM FULL ANALYZE", &[]).await?; + Ok(()) } @@ -178,9 +196,9 @@ impl PostgresDb { } #[async_trait] -impl CardtestRunnerDBMSHelper for PostgresDb { +impl CardtestRunnerDBMSHelper for PostgresDBMS { fn get_name(&self) -> &str { - "Postgres" + POSTGRES_DBMS_NAME } async fn eval_benchmark_estcards( @@ -205,13 +223,16 @@ impl CardtestRunnerDBMSHelper for PostgresDb { let client = self.connect_to_db(&dbname).await?; match benchmark { Benchmark::Test => unimplemented!(), - Benchmark::Tpch(tpch_config) => self.eval_tpch_truecards(&client, tpch_config).await, + Benchmark::Tpch(tpch_config) => { + self.eval_tpch_truecards(&client, tpch_config, &dbname) + .await + } } } } -/// This impl has helpers for ```impl CardtestRunnerDBMSHelper for PostgresDb``` -impl PostgresDb { +/// This impl has helpers for ```impl CardtestRunnerDBMSHelper for PostgresDBMS``` +impl PostgresDBMS { async fn eval_tpch_estcards( &self, client: &Client, @@ -231,9 +252,10 @@ impl PostgresDb { } async fn eval_tpch_truecards( - &self, + &mut self, client: &Client, tpch_config: &TpchConfig, + dbname: &str, // used by truecard_cache ) -> anyhow::Result> { let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_queries(tpch_config)?; @@ -241,7 +263,14 @@ impl PostgresDb { let mut truecards = vec![]; for sql_fpath in tpch_kit.get_sql_fpath_ordered_iter(tpch_config)? { let sql = fs::read_to_string(sql_fpath)?; - let truecard = self.eval_query_truecard(client, &sql).await?; + let truecard = match self.truecard_cache.get_truecard(dbname, &sql) { + Some(truecard) => truecard, + None => { + let truecard = self.eval_query_truecard(client, &sql).await?; + self.truecard_cache.insert_truecard(dbname, &sql, truecard); + truecard + } + }; truecards.push(truecard); } @@ -259,7 +288,7 @@ impl PostgresDb { self.log_explain(&explain_rows); // the first line contains the explain of the root node let first_explain_line: &str = explain_rows.first().unwrap().get(0); - let estcard = PostgresDb::extract_row_count(first_explain_line).unwrap(); + let estcard = PostgresDBMS::extract_row_count(first_explain_line).unwrap(); Ok(estcard) } diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index 61e592ad..5d12fe27 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -49,7 +49,7 @@ pub struct TpchKit { pub schema_fpath: PathBuf, } -/// I keep the same conventions for these methods as I do for PostgresDb +/// I keep the same conventions for these methods as I do for PostgresDBMS impl TpchKit { pub fn build>(workspace_dpath: P) -> io::Result { log::debug!("[start] building TpchKit"); @@ -193,10 +193,7 @@ impl TpchKit { /// If two TpchConfig instances would *not always* generate the same data, then their /// directory names must be different. fn get_this_genned_tables_dpath(&self, tpch_config: &TpchConfig) -> PathBuf { - let dname = format!( - "db{}_sf{}_sd{}", - tpch_config.dbms, tpch_config.scale_factor, tpch_config.seed - ); + let dname = format!("db{}_sf{}", tpch_config.dbms, tpch_config.scale_factor,); self.genned_tables_dpath.join(dname) } diff --git a/optd-perftest/src/truecard_cache.rs b/optd-perftest/src/truecard_cache.rs index ee0bd2a4..6cfda34a 100644 --- a/optd-perftest/src/truecard_cache.rs +++ b/optd-perftest/src/truecard_cache.rs @@ -1,13 +1,80 @@ +use std::{ + collections::HashMap, + fs::{self, File}, + path::{Path, PathBuf}, +}; + /// A cache that gets persisted to disk for the true cardinalities of queries on a DBMS -struct TruecardCache { +/// It's difficult to share the same cache (i.e. having a three-level hash map of DBMS -> db -> sql -> +/// truecard) between multiple DBMSs because then multiple different Rust objects could be writing to +/// the same cache file. +/// It's possible to have one cache per database per DBMS instead of one per DBMS, but it just felt +/// cleaner to me to tie the lifetime of the cache with the lifetime of the DBMS object. If you +/// did one cache per database per DBMS, those caches would still be tied to the lifetime of the +/// DBMS object. +/// Note that only the dbms_cache field gets persisted +pub struct DBMSTruecardCache { + workspace_dpath: PathBuf, // The DBMS the queries of this cache were executed on dbms_name: String, - // The cache from dbname -> sql -> cardinality + // The cache from dbname -> sql -> true cardinality // Note that dbname is the database _within_ the DBMS that sql was executed on - cache: HashMap>, + dbms_cache: HashMap>, } -// TODO(phw2): write function that takes in a Benchmark and creates a new entry in the cache -impl TruecardCache { - -} \ No newline at end of file +impl DBMSTruecardCache { + fn get_serialized_fpath>(workspace_dpath: P, dbms_name: &str) -> PathBuf { + workspace_dpath + .as_ref() + .join("truecard_caches") + .join(dbms_name) + } + + pub fn build>(workspace_dpath: P, dbms_name: &str) -> anyhow::Result { + let serialized_fpath = Self::get_serialized_fpath(&workspace_dpath, dbms_name); + let dbms_cache = if serialized_fpath.exists() { + let file = File::open(serialized_fpath)?; + serde_json::from_reader(file)? + } else { + HashMap::new() + }; + + Ok(Self { + workspace_dpath: PathBuf::from(workspace_dpath.as_ref()), + dbms_name: String::from(dbms_name), + dbms_cache, + }) + } + + pub fn insert_truecard(&mut self, dbname: &str, sql: &str, truecard: usize) { + let db_cache = match self.dbms_cache.get_mut(dbname) { + Some(db_cache) => db_cache, + None => { + self.dbms_cache.insert(String::from(dbname), HashMap::new()); + self.dbms_cache.get_mut(dbname).unwrap() + } + }; + db_cache.insert(String::from(sql), truecard); + } + + pub fn get_truecard(&self, dbname: &str, sql: &str) -> Option { + self.dbms_cache + .get(dbname) + .and_then(|db_cache| db_cache.get(sql).copied()) + } + + pub fn save(&self) -> anyhow::Result<()> { + let serialized_fpath = Self::get_serialized_fpath(&self.workspace_dpath, &self.dbms_name); + fs::create_dir_all(serialized_fpath.parent().unwrap())?; + // this will create a new file or truncate the file if it already exists + let file = File::create(serialized_fpath)?; + serde_json::to_writer(file, &self.dbms_cache)?; + Ok(()) + } +} + +impl Drop for DBMSTruecardCache { + fn drop(&mut self) { + self.save().unwrap(); + } +} diff --git a/optd-sqlplannertest/src/bin/planner_test_apply.rs b/optd-sqlplannertest/src/bin/planner_test_apply.rs index 50608bf4..c7d91c8d 100644 --- a/optd-sqlplannertest/src/bin/planner_test_apply.rs +++ b/optd-sqlplannertest/src/bin/planner_test_apply.rs @@ -6,7 +6,7 @@ use anyhow::Result; async fn main() -> Result<()> { sqlplannertest::planner_test_apply( Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"), - || async { optd_sqlplannertest::DatafusionDb::new().await }, + || async { optd_sqlplannertest::DatafusionDBMS::new().await }, ) .await?; Ok(()) diff --git a/optd-sqlplannertest/src/lib.rs b/optd-sqlplannertest/src/lib.rs index d63bcf75..2dbd6f54 100644 --- a/optd-sqlplannertest/src/lib.rs +++ b/optd-sqlplannertest/src/lib.rs @@ -22,17 +22,17 @@ use anyhow::{Context, Result}; use async_trait::async_trait; #[derive(Default)] -pub struct DatafusionDb { +pub struct DatafusionDBMS { ctx: SessionContext, /// Context enabling datafusion's logical optimizer. with_logical_ctx: SessionContext, } -impl DatafusionDb { +impl DatafusionDBMS { pub async fn new() -> Result { - let ctx = DatafusionDb::new_session_ctx(false, None).await?; + let ctx = DatafusionDBMS::new_session_ctx(false, None).await?; let with_logical_ctx = - DatafusionDb::new_session_ctx(true, Some(ctx.state().catalog_list().clone())).await?; + DatafusionDBMS::new_session_ctx(true, Some(ctx.state().catalog_list().clone())).await?; Ok(Self { ctx, with_logical_ctx, @@ -223,7 +223,7 @@ impl DatafusionDb { } #[async_trait] -impl sqlplannertest::PlannerTestRunner for DatafusionDb { +impl sqlplannertest::PlannerTestRunner for DatafusionDBMS { async fn run(&mut self, test_case: &sqlplannertest::ParsedTestCase) -> Result { for before in &test_case.before_sql { self.execute(before, true) diff --git a/optd-sqlplannertest/tests/planner_test.rs b/optd-sqlplannertest/tests/planner_test.rs index f870ce7c..5aa2476d 100644 --- a/optd-sqlplannertest/tests/planner_test.rs +++ b/optd-sqlplannertest/tests/planner_test.rs @@ -5,7 +5,7 @@ use anyhow::Result; fn main() -> Result<()> { sqlplannertest::planner_test_runner( Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"), - || async { optd_sqlplannertest::DatafusionDb::new().await }, + || async { optd_sqlplannertest::DatafusionDBMS::new().await }, )?; Ok(()) }