Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
feat: caching true cardinalities, 22x speedup for TPC-H SF1 (#124)
Browse files Browse the repository at this point in the history
**Summary**: Instead of executing queries every time we run cardtest
(takes multiple minutes even for scale factor 1.0), we cache the true
cardinalities from the first execution.

**Demo**:
22x speedup (29m55s -> 1m21s) on TPC-H scale factor 1

![Screenshot 2024-03-21 at 21 06
10](https://github.com/cmu-db/optd/assets/20631215/ccd4d117-42c9-4420-a3dc-b4137a18a2ea)
![Screenshot 2024-03-21 at 21 05
32](https://github.com/cmu-db/optd/assets/20631215/a694a566-3b80-4ed3-8394-d27d54e5b7f2)


JSON cache file
![Screenshot 2024-03-21 at 19 00
09](https://github.com/cmu-db/optd/assets/20631215/374a7a1c-d6b7-495f-86bb-233c3629abe9)

**Details**:
* We now call `VACUUM FULL ANALYZE` in Postgres right after loading data
to create stats, which we weren't doing before.
* Because Postgres is faster and less buggy than optd+DataFusion, we
only get truecards from Postgres.
* Fixed bug where `dbname` and `tables/` were parameterized by the seed,
which would make us miss caching opportunities.

---------

Co-authored-by: Zhidong Guo <[email protected]>
  • Loading branch information
wangpatrick57 and Gun9niR authored Mar 22, 2024
1 parent 368bcd6 commit 9dfbb47
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
/.DS_Store
/.idea
.history
**/*_workspace/**
**/*_workspace/
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions optd-perftest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tokio-util = "0.7"
futures-util = "0.3"
statistical = "1.0"
prettytable-rs = "0.10"
serde_json = "1.0"

[dev_dependencies]
assert_cmd = "2.0"
2 changes: 1 addition & 1 deletion optd-perftest/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl Benchmark {
let dbname = match self {
Self::Test => String::from("test"),
Self::Tpch(tpch_config) => {
format!("tpch_sf{}_sd{}", tpch_config.scale_factor, tpch_config.seed)
format!("tpch_sf{}", tpch_config.scale_factor)
}
};
// since Postgres names cannot contain periods
Expand Down
25 changes: 16 additions & 9 deletions optd-perftest/src/cardtest.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::path::Path;

use crate::postgres_db::PostgresDb;
use crate::{benchmark::Benchmark, datafusion_db::DatafusionDb, tpch::TpchConfig};
use crate::postgres_dbms::{PostgresDBMS, POSTGRES_DBMS_NAME};
use crate::{benchmark::Benchmark, datafusion_dbms::DatafusionDBMS, tpch::TpchConfig};

use anyhow::{self};
use async_trait::async_trait;
Expand Down Expand Up @@ -32,14 +32,21 @@ impl CardtestRunner {
) -> anyhow::Result<HashMap<String, Vec<f64>>> {
let mut qerrors_alldbs = HashMap::new();

// postgres runs faster and is less buggy so we use their true cardinalities
// in the future, it's probably a good idea to get the truecards of datafusion to ensure that they match
let pg_dbms = self
.dbmss
.iter_mut()
.find(|dbms| dbms.get_name() == POSTGRES_DBMS_NAME)
.unwrap();
let pg_truecards = pg_dbms.eval_benchmark_truecards(benchmark).await?;

for dbms in &mut self.dbmss {
let estcards = dbms.eval_benchmark_estcards(benchmark).await?;
let truecards = dbms.eval_benchmark_truecards(benchmark).await?;
assert!(truecards.len() == estcards.len());
let qerrors = estcards
.into_iter()
.zip(truecards.into_iter())
.map(|(estcard, truecard)| CardtestRunner::calc_qerror(estcard, truecard))
.zip(pg_truecards.iter())
.map(|(estcard, truecard)| CardtestRunner::calc_qerror(estcard, *truecard))
.collect();
qerrors_alldbs.insert(String::from(dbms.get_name()), qerrors);
}
Expand Down Expand Up @@ -95,9 +102,9 @@ pub async fn cardtest<P: AsRef<Path> + Clone>(
pgpassword: &str,
tpch_config: TpchConfig,
) -> anyhow::Result<HashMap<String, Vec<f64>>> {
let pg_db = PostgresDb::new(workspace_dpath.clone(), pguser, pgpassword);
let df_db = DatafusionDb::new(workspace_dpath).await?;
let dbmss: Vec<Box<dyn CardtestRunnerDBMSHelper>> = vec![Box::new(pg_db), Box::new(df_db)];
let pg_dbms = PostgresDBMS::build(workspace_dpath.clone(), pguser, pgpassword)?;
let df_dbms = DatafusionDBMS::new(workspace_dpath).await?;
let dbmss: Vec<Box<dyn CardtestRunnerDBMSHelper>> = vec![Box::new(pg_dbms), Box::new(df_dbms)];

let tpch_benchmark = Benchmark::Tpch(tpch_config.clone());
let mut cardtest_runner = CardtestRunner::new(dbmss).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner};
use optd_datafusion_repr::{cost::BaseTableStats, cost::PerTableStats, DatafusionOptimizer};
use regex::Regex;

pub struct DatafusionDb {
pub struct DatafusionDBMS {
workspace_dpath: PathBuf,
ctx: SessionContext,
}

#[async_trait]
impl CardtestRunnerDBMSHelper for DatafusionDb {
impl CardtestRunnerDBMSHelper for DatafusionDBMS {
fn get_name(&self) -> &str {
"DataFusion"
}
Expand Down Expand Up @@ -63,9 +63,9 @@ impl CardtestRunnerDBMSHelper for DatafusionDb {
}
}

impl DatafusionDb {
impl DatafusionDBMS {
pub async fn new<P: AsRef<Path>>(workspace_dpath: P) -> anyhow::Result<Self> {
Ok(DatafusionDb {
Ok(DatafusionDBMS {
workspace_dpath: workspace_dpath.as_ref().to_path_buf(),
ctx: Self::new_session_ctx(None).await?,
})
Expand Down Expand Up @@ -319,4 +319,4 @@ impl DatafusionDb {
}
}

unsafe impl Send for DatafusionDb {}
unsafe impl Send for DatafusionDBMS {}
5 changes: 3 additions & 2 deletions optd-perftest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod benchmark;
pub mod cardtest;
mod datafusion_db;
mod postgres_db;
mod datafusion_dbms;
mod postgres_dbms;
pub mod shell;
pub mod tpch;
mod truecard_cache;
28 changes: 15 additions & 13 deletions optd-perftest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ enum Commands {
},
}

// q-errors are always >= 1.0 so two decimal points is enough
fn fmt_qerror(qerror: f64) -> String {
format!("{:.2}", qerror)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
Expand All @@ -73,15 +78,11 @@ async fn main() -> anyhow::Result<()> {
};
let qerrors_alldbs =
cardtest::cardtest(&workspace_dpath, &pguser, &pgpassword, tpch_config).await?;
println!();
println!(" Aggregate Q-Error Comparison");
let mut agg_qerror_table = Table::new();
agg_qerror_table.set_titles(prettytable::row![
"DBMS",
"Median",
"# Infinite",
"Mean",
"Min",
"Max"
"DBMS", "Median", "# Inf", "Mean", "Min", "Max"
]);
for (dbms, qerrors) in &qerrors_alldbs {
if !qerrors.is_empty() {
Expand All @@ -93,22 +94,22 @@ async fn main() -> anyhow::Result<()> {
let ninf_qerrors = qerrors.len() - finite_qerrors.len();
let mean_qerror =
finite_qerrors.iter().sum::<f64>() / finite_qerrors.len() as f64;
let min_qerror = finite_qerrors
let min_qerror = qerrors
.iter()
.min_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
let median_qerror = statistical::median(qerrors);
let max_qerror = finite_qerrors
let max_qerror = qerrors
.iter()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
agg_qerror_table.add_row(prettytable::row![
dbms,
median_qerror,
fmt_qerror(median_qerror),
ninf_qerrors,
mean_qerror,
min_qerror,
max_qerror
fmt_qerror(mean_qerror),
fmt_qerror(*min_qerror),
fmt_qerror(*max_qerror),
]);
} else {
agg_qerror_table
Expand All @@ -119,6 +120,7 @@ async fn main() -> anyhow::Result<()> {
agg_qerror_table.printstd();

let mut per_query_qerror_table = Table::new();
println!();
println!(" Per-Query Q-Error Comparison");
let title_cells = iter::once(Cell::new("Query #"))
.chain(qerrors_alldbs.keys().map(|dbms| Cell::new(dbms)))
Expand All @@ -129,7 +131,7 @@ async fn main() -> anyhow::Result<()> {
row_cells.push(prettytable::cell!(query_id));
for qerrors in qerrors_alldbs.values() {
let qerror = qerrors.get(i).unwrap();
row_cells.push(prettytable::cell!(qerror));
row_cells.push(prettytable::cell!(fmt_qerror(*qerror)));
}
per_query_qerror_table.add_row(Row::new(row_cells));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
benchmark::Benchmark,
cardtest::CardtestRunnerDBMSHelper,
tpch::{TpchConfig, TpchKit},
truecard_cache::DBMSTruecardCache,
};
use async_trait::async_trait;
use futures::Sink;
Expand All @@ -17,27 +18,39 @@ use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_postgres::{Client, NoTls, Row};

/// The name of the Postgres DBMS (as opposed to the DataFusion DBMS for instance)
pub const POSTGRES_DBMS_NAME: &str = "Postgres";

/// This dbname is assumed to always exist
const DEFAULT_DBNAME: &str = "postgres";

pub struct PostgresDb {
pub struct PostgresDBMS {
workspace_dpath: PathBuf,
pguser: String,
pgpassword: String,
truecard_cache: DBMSTruecardCache,
}

/// Conventions I keep for methods of this class:
/// - Functions should be idempotent. For instance, start_postgres() should not fail if Postgres is already running
/// - For instance, this is why "createdb" is _not_ a function
/// - Stop and start functions should be separate
/// - Setup should be done in build() unless it requires more information (like benchmark)
impl PostgresDb {
pub fn new<P: AsRef<Path>>(workspace_dpath: P, pguser: &str, pgpassword: &str) -> Self {
Self {
workspace_dpath: PathBuf::from(workspace_dpath.as_ref()),
impl PostgresDBMS {
pub fn build<P: AsRef<Path>>(
workspace_dpath: P,
pguser: &str,
pgpassword: &str,
) -> anyhow::Result<Self> {
let workspace_dpath = PathBuf::from(workspace_dpath.as_ref());
let truecard_cache = DBMSTruecardCache::build(&workspace_dpath, POSTGRES_DBMS_NAME)?;
let pg_dbms = Self {
workspace_dpath,
pguser: String::from(pguser),
pgpassword: String::from(pgpassword),
}
truecard_cache,
};
Ok(pg_dbms)
}

/// Create a connection to a Postgres database
Expand Down Expand Up @@ -145,6 +158,11 @@ impl PostgresDb {
Self::copy_from_stdin(client, tbl_fpath).await?;
}

// create stats
// you need to do VACUUM FULL ANALYZE and not just ANALYZE to make sure the stats are created in a deterministic way
// this is standard practice for postgres benchmarking
client.query("VACUUM FULL ANALYZE", &[]).await?;

Ok(())
}

Expand Down Expand Up @@ -178,9 +196,9 @@ impl PostgresDb {
}

#[async_trait]
impl CardtestRunnerDBMSHelper for PostgresDb {
impl CardtestRunnerDBMSHelper for PostgresDBMS {
fn get_name(&self) -> &str {
"Postgres"
POSTGRES_DBMS_NAME
}

async fn eval_benchmark_estcards(
Expand All @@ -205,13 +223,16 @@ impl CardtestRunnerDBMSHelper for PostgresDb {
let client = self.connect_to_db(&dbname).await?;
match benchmark {
Benchmark::Test => unimplemented!(),
Benchmark::Tpch(tpch_config) => self.eval_tpch_truecards(&client, tpch_config).await,
Benchmark::Tpch(tpch_config) => {
self.eval_tpch_truecards(&client, tpch_config, &dbname)
.await
}
}
}
}

/// This impl has helpers for ```impl CardtestRunnerDBMSHelper for PostgresDb```
impl PostgresDb {
/// This impl has helpers for ```impl CardtestRunnerDBMSHelper for PostgresDBMS```
impl PostgresDBMS {
async fn eval_tpch_estcards(
&self,
client: &Client,
Expand All @@ -231,17 +252,25 @@ impl PostgresDb {
}

async fn eval_tpch_truecards(
&self,
&mut self,
client: &Client,
tpch_config: &TpchConfig,
dbname: &str, // used by truecard_cache
) -> anyhow::Result<Vec<usize>> {
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
tpch_kit.gen_queries(tpch_config)?;

let mut truecards = vec![];
for sql_fpath in tpch_kit.get_sql_fpath_ordered_iter(tpch_config)? {
let sql = fs::read_to_string(sql_fpath)?;
let truecard = self.eval_query_truecard(client, &sql).await?;
let truecard = match self.truecard_cache.get_truecard(dbname, &sql) {
Some(truecard) => truecard,
None => {
let truecard = self.eval_query_truecard(client, &sql).await?;
self.truecard_cache.insert_truecard(dbname, &sql, truecard);
truecard
}
};
truecards.push(truecard);
}

Expand All @@ -259,7 +288,7 @@ impl PostgresDb {
self.log_explain(&explain_rows);
// the first line contains the explain of the root node
let first_explain_line: &str = explain_rows.first().unwrap().get(0);
let estcard = PostgresDb::extract_row_count(first_explain_line).unwrap();
let estcard = PostgresDBMS::extract_row_count(first_explain_line).unwrap();
Ok(estcard)
}

Expand Down
7 changes: 2 additions & 5 deletions optd-perftest/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct TpchKit {
pub schema_fpath: PathBuf,
}

/// I keep the same conventions for these methods as I do for PostgresDb
/// I keep the same conventions for these methods as I do for PostgresDBMS
impl TpchKit {
pub fn build<P: AsRef<Path>>(workspace_dpath: P) -> io::Result<Self> {
log::debug!("[start] building TpchKit");
Expand Down Expand Up @@ -193,10 +193,7 @@ impl TpchKit {
/// If two TpchConfig instances would *not always* generate the same data, then their
/// directory names must be different.
fn get_this_genned_tables_dpath(&self, tpch_config: &TpchConfig) -> PathBuf {
let dname = format!(
"db{}_sf{}_sd{}",
tpch_config.dbms, tpch_config.scale_factor, tpch_config.seed
);
let dname = format!("db{}_sf{}", tpch_config.dbms, tpch_config.scale_factor,);
self.genned_tables_dpath.join(dname)
}

Expand Down
Loading

0 comments on commit 9dfbb47

Please sign in to comment.