From a332018e99d6221f4681da36c63f2a26b05aa386 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 15 Apr 2024 09:27:38 -0400 Subject: [PATCH] feat: can load files > 1GB into postgres (#159) **Summary**: Can now load files \> 1GB in size by chunking files into 2MB chunks for `COPY FROM STDIN`. **Demo**: JOB's `cast_info.csv` is 1.3GB and has 36244344 rows (image 1). All 36244344 are loaded successfully (image 2). ![Screenshot 2024-04-14 at 18 06 22](https://github.com/cmu-db/optd/assets/20631215/c97c6649-548e-488a-ae6f-a0f9c458044a) ![Screenshot 2024-04-14 at 18 06 36](https://github.com/cmu-db/optd/assets/20631215/99967024-5631-4e53-a2e1-c7f0088da1f2) **Details**: * Postgres does not allow statements that exceed 1GB in size. Since we previously sent the entire file in a single statement, this caused Postgres to cancel the transaction. * This is fixed by chunking files into 2MB chunks and doing a separate `COPY FROM STDIN` statement for each. * We also solved another issue in the meantime: we can now load files that would not fit in memory. --- Cargo.lock | 34 +++ optd-perftest/Cargo.toml | 1 + optd-perftest/src/benchmark.rs | 30 +-- optd-perftest/src/cardtest.rs | 10 +- optd-perftest/src/datafusion_dbms.rs | 2 +- optd-perftest/src/job.rs | 18 +- optd-perftest/src/lib.rs | 2 +- optd-perftest/src/main.rs | 216 ++++++++++++-------- optd-perftest/src/postgres_dbms.rs | 115 ++++++++--- optd-perftest/src/tpch.rs | 15 +- optd-perftest/src/truecard.rs | 10 +- optd-perftest/tests/cardtest_integration.rs | 13 +- 12 files changed, 304 insertions(+), 162 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf18c0ea..e1a68a5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2881,6 +2881,7 @@ dependencies = [ "serde_json", "shlex", "statistical", + "test-case", "tokio", "tokio-postgres", "tokio-util", @@ -4183,6 +4184,39 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" +[[package]] +name = "test-case" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8" +dependencies = [ + "test-case-macros", +] + +[[package]] +name = "test-case-core" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" +dependencies = [ + "cfg-if", + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", +] + +[[package]] +name = "test-case-macros" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", + "test-case-core", +] + [[package]] name = "textwrap" version = "0.16.0" diff --git a/optd-perftest/Cargo.toml b/optd-perftest/Cargo.toml index 06d33fd2..5763a39b 100644 --- a/optd-perftest/Cargo.toml +++ b/optd-perftest/Cargo.toml @@ -43,6 +43,7 @@ statistical = "1.0" prettytable-rs = "0.10" serde = "1.0" serde_json = "1.0" +test-case = "3.3" [dev_dependencies] assert_cmd = "2.0" diff --git a/optd-perftest/src/benchmark.rs b/optd-perftest/src/benchmark.rs index 1056b198..69ea18ab 100644 --- a/optd-perftest/src/benchmark.rs +++ b/optd-perftest/src/benchmark.rs @@ -1,11 +1,10 @@ -use crate::tpch::TpchConfig; +use crate::{job::JobConfig, tpch::TpchConfig}; use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] pub enum Benchmark { - #[allow(dead_code)] - Test, Tpch(TpchConfig), + Job(JobConfig), } impl Benchmark { @@ -25,40 +24,29 @@ impl Benchmark { /// lowercase. To resolve the inconsistency, the names output by this function will /// *not* contain uppercase letters. pub fn get_dbname(&self) -> String { - let dbname = match self { - Self::Test => String::from("test"), - Self::Tpch(tpch_config) => { - format!("tpch_sf{}", tpch_config.scale_factor) - } - }; + let fname = self.get_fname(); // since Postgres names cannot contain periods - let dbname = dbname.replace('.', "point"); + let dbname = fname.replace('.', "point"); // due to the weird inconsistency with Postgres (see function comment) dbname.to_lowercase() } - /// Use this when you need a unique file name. The rules for file names are different from the - /// rules for database names, so this is a different function + /// Use this when you need a unique file name to deterministically describe the "data" + /// of the benchmark. The rules for file names are different from the rules for + /// database names, so this is a different function. pub fn get_fname(&self) -> String { match self { - Self::Test => String::from("test"), Self::Tpch(tpch_config) => { format!("tpch_sf{}", tpch_config.scale_factor) } + Self::Job(_) => String::from("job"), } } - /// An ID is just a unique string identifying the benchmark - /// It's not always used in the same situations as get_dbname(), so it's a separate function - pub fn get_id(&self) -> String { - // the fact that it happens to return dbname is an implementation detail - self.get_dbname() - } - pub fn is_readonly(&self) -> bool { match self { - Self::Test => true, Self::Tpch(_) => true, + Self::Job(_) => true, } } } diff --git a/optd-perftest/src/cardtest.rs b/optd-perftest/src/cardtest.rs index 0b9158cb..e224ff27 100644 --- a/optd-perftest/src/cardtest.rs +++ b/optd-perftest/src/cardtest.rs @@ -3,7 +3,7 @@ use std::path::Path; use crate::postgres_dbms::PostgresDBMS; use crate::truecard::TruecardGetter; -use crate::{benchmark::Benchmark, datafusion_dbms::DatafusionDBMS, tpch::TpchConfig}; +use crate::{benchmark::Benchmark, datafusion_dbms::DatafusionDBMS}; use anyhow::{self}; use async_trait::async_trait; @@ -101,22 +101,22 @@ pub trait CardtestRunnerDBMSHelper { ) -> anyhow::Result>; } -pub async fn cardtest>( +/// The core logic of cardinality testing. +pub async fn cardtest_core>( workspace_dpath: P, rebuild_cached_optd_stats: bool, pguser: &str, pgpassword: &str, - tpch_config: TpchConfig, + benchmark: Benchmark, ) -> anyhow::Result>> { let pg_dbms = Box::new(PostgresDBMS::build(&workspace_dpath, pguser, pgpassword)?); let truecard_getter = pg_dbms.clone(); let df_dbms = Box::new(DatafusionDBMS::new(&workspace_dpath, rebuild_cached_optd_stats).await?); let dbmss: Vec> = vec![pg_dbms, df_dbms]; - let tpch_benchmark = Benchmark::Tpch(tpch_config.clone()); let mut cardtest_runner = CardtestRunner::new(dbmss, truecard_getter).await?; let cardinfos_alldbs = cardtest_runner - .eval_benchmark_cardinfos_alldbs(&tpch_benchmark) + .eval_benchmark_cardinfos_alldbs(&benchmark) .await?; Ok(cardinfos_alldbs) } diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index c8be3f93..89db005c 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -54,8 +54,8 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { let tpch_kit = TpchKit::build(&self.workspace_dpath)?; self.create_tpch_tables(&tpch_kit).await?; match benchmark { - Benchmark::Test => unimplemented!(), Benchmark::Tpch(tpch_config) => self.eval_tpch_estcards(tpch_config).await, + Benchmark::Job(_job_config) => unimplemented!(), } } } diff --git a/optd-perftest/src/job.rs b/optd-perftest/src/job.rs index e71ae57f..a59a982e 100644 --- a/optd-perftest/src/job.rs +++ b/optd-perftest/src/job.rs @@ -13,7 +13,7 @@ const JOB_TABLES_URL: &str = "https://homepages.cwi.nl/~boncz/job/imdb.tgz"; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JobConfig { - pub query_ids: Vec, + pub query_ids: Vec, } impl Display for JobConfig { @@ -128,13 +128,17 @@ impl JobKit { pub fn get_sql_fpath_ordered_iter( &self, job_config: &JobConfig, - ) -> io::Result> { + ) -> io::Result> { let queries_dpath = self.queries_dpath.clone(); - let sql_fpath_ordered_iter = job_config - .query_ids - .clone() - .into_iter() - .map(move |query_id| (query_id, queries_dpath.join(format!("{}.sql", query_id)))); + let sql_fpath_ordered_iter = + job_config + .query_ids + .clone() + .into_iter() + .map(move |query_id| { + let this_genned_query_fpath = queries_dpath.join(format!("{}.sql", &query_id)); + (query_id, this_genned_query_fpath) + }); Ok(sql_fpath_ordered_iter) } } diff --git a/optd-perftest/src/lib.rs b/optd-perftest/src/lib.rs index 058185ef..a02d3b96 100644 --- a/optd-perftest/src/lib.rs +++ b/optd-perftest/src/lib.rs @@ -1,4 +1,4 @@ -mod benchmark; +pub mod benchmark; pub mod cardtest; mod datafusion_dbms; pub mod job; diff --git a/optd-perftest/src/main.rs b/optd-perftest/src/main.rs index 17fc284f..d9a2664a 100644 --- a/optd-perftest/src/main.rs +++ b/optd-perftest/src/main.rs @@ -1,9 +1,12 @@ -use clap::{Parser, Subcommand}; +use clap::{Parser, Subcommand, ValueEnum}; +use optd_perftest::benchmark::Benchmark; use optd_perftest::cardtest; +use optd_perftest::job::JobConfig; use optd_perftest::shell; use optd_perftest::tpch::{TpchConfig, TPCH_KIT_POSTGRES}; use prettytable::{format, Table}; use std::fs; +use std::path::Path; #[derive(Parser)] struct Cli { @@ -18,9 +21,20 @@ struct Cli { command: Commands, } +#[derive(Copy, Clone, Debug, ValueEnum)] +enum BenchmarkName { + Tpch, + Job, +} + #[derive(Subcommand)] enum Commands { Cardtest { + #[clap(long)] + #[clap(value_enum)] + #[clap(default_value = "tpch")] + benchmark_name: BenchmarkName, + #[clap(long)] #[clap(default_value = "0.01")] scale_factor: f64, @@ -31,18 +45,18 @@ enum Commands { #[clap(long)] #[clap(value_delimiter = ',', num_args = 1..)] - // this is the current list of all queries that work in perftest + // This is the current list of all queries that work in perftest #[clap(default_value = "2,3,5,6,7,8,9,10,11,12,13,14,17,19")] #[clap(help = "The queries to get the Q-error of")] - query_ids: Vec, + query_ids: Vec, #[clap(long)] #[clap(action)] #[clap(help = "Whether to use the cached optd stats/cache generated stats")] - // this is an option because you want to make it true whenever you update the - // code for how stats are generated in optd, in order to not use cached stats + // This is an option because you want to make it true whenever you update the + // code for how stats are generated in optd, in order to not use cached stats. // I found that I almost always want to use the cache though, which is why the - // system will use the cache by default + // system will use the cache by default. rebuild_cached_optd_stats: bool, #[clap(long)] @@ -57,11 +71,112 @@ enum Commands { }, } -// q-errors are always >= 1.0 so two decimal points is enough +/// Q-errors are always >= 1.0 so two decimal points is enough fn fmt_qerror(qerror: f64) -> String { format!("{:.2}", qerror) } +/// cardtest::cardtest_core() expects sanitized inputs and returns outputs in their simplest form. +/// This function wraps around cardtest::cardtest_core() to sanitize the inputs and print the outputs nicely. +#[allow(clippy::too_many_arguments)] +async fn cardtest>( + workspace_dpath: P, + benchmark_name: BenchmarkName, + scale_factor: f64, + seed: i32, + query_ids: Vec, + rebuild_cached_optd_stats: bool, + pguser: String, + pgpassword: String, +) -> anyhow::Result<()> { + let benchmark = match benchmark_name { + BenchmarkName::Tpch => Benchmark::Tpch(TpchConfig { + dbms: String::from(TPCH_KIT_POSTGRES), + scale_factor, + seed, + query_ids: query_ids.clone(), + }), + BenchmarkName::Job => Benchmark::Job(JobConfig { + query_ids: query_ids.clone(), + }), + }; + + let cardinfo_alldbs = cardtest::cardtest_core( + &workspace_dpath, + rebuild_cached_optd_stats, + &pguser, + &pgpassword, + benchmark, + ) + .await?; + + println!(); + println!(" Aggregate Q-Error Comparison"); + let mut agg_qerror_table = Table::new(); + agg_qerror_table.set_titles(prettytable::row![ + "DBMS", "Median", "# Inf", "Mean", "Min", "Max" + ]); + for (dbms, cardinfos) in &cardinfo_alldbs { + if !cardinfos.is_empty() { + let qerrors: Vec = cardinfos.iter().map(|cardinfo| cardinfo.qerror).collect(); + let finite_qerrors: Vec = qerrors + .clone() + .into_iter() + .filter(|qerror| qerror.is_finite()) + .collect(); + let ninf_qerrors = qerrors.len() - finite_qerrors.len(); + let mean_qerror = finite_qerrors.iter().sum::() / finite_qerrors.len() as f64; + let min_qerror = qerrors + .iter() + .min_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap(); + let median_qerror = statistical::median(&qerrors); + let max_qerror = qerrors + .iter() + .max_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap(); + agg_qerror_table.add_row(prettytable::row![ + dbms, + fmt_qerror(median_qerror), + ninf_qerrors, + fmt_qerror(mean_qerror), + fmt_qerror(*min_qerror), + fmt_qerror(*max_qerror), + ]); + } else { + agg_qerror_table.add_row(prettytable::row![dbms, "N/A", "N/A", "N/A", "N/A", "N/A"]); + } + } + agg_qerror_table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); + agg_qerror_table.printstd(); + + println!(); + println!(" Per-Query Cardinality Info"); + println!(" ==========================="); + for (i, query_id) in query_ids.iter().enumerate() { + println!(" Query {}", query_id); + let mut this_query_cardinfo_table = Table::new(); + this_query_cardinfo_table.set_titles(prettytable::row![ + "DBMS", + "Q-Error", + "Est. Card.", + "True Card." + ]); + for (dbms, cardinfos) in &cardinfo_alldbs { + let this_query_cardinfo = cardinfos.get(i).unwrap(); + this_query_cardinfo_table.add_row(prettytable::row![ + dbms, + this_query_cardinfo.qerror, + this_query_cardinfo.estcard, + this_query_cardinfo.truecard + ]); + } + this_query_cardinfo_table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); + this_query_cardinfo_table.printstd(); + } + Ok(()) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -74,6 +189,7 @@ async fn main() -> anyhow::Result<()> { match cli.command { Commands::Cardtest { + benchmark_name, scale_factor, seed, query_ids, @@ -81,89 +197,17 @@ async fn main() -> anyhow::Result<()> { pguser, pgpassword, } => { - let tpch_config = TpchConfig { - dbms: String::from(TPCH_KIT_POSTGRES), + cardtest( + workspace_dpath, + benchmark_name, scale_factor, seed, - query_ids: query_ids.clone(), - }; - let cardinfo_alldbs = cardtest::cardtest( - &workspace_dpath, + query_ids, rebuild_cached_optd_stats, - &pguser, - &pgpassword, - tpch_config, + pguser, + pgpassword, ) - .await?; - println!(); - println!(" Aggregate Q-Error Comparison"); - let mut agg_qerror_table = Table::new(); - agg_qerror_table.set_titles(prettytable::row![ - "DBMS", "Median", "# Inf", "Mean", "Min", "Max" - ]); - for (dbms, cardinfos) in &cardinfo_alldbs { - if !cardinfos.is_empty() { - let qerrors: Vec = - cardinfos.iter().map(|cardinfo| cardinfo.qerror).collect(); - let finite_qerrors: Vec = qerrors - .clone() - .into_iter() - .filter(|qerror| qerror.is_finite()) - .collect(); - let ninf_qerrors = qerrors.len() - finite_qerrors.len(); - let mean_qerror = - finite_qerrors.iter().sum::() / finite_qerrors.len() as f64; - let min_qerror = qerrors - .iter() - .min_by(|a, b| a.partial_cmp(b).unwrap()) - .unwrap(); - let median_qerror = statistical::median(&qerrors); - let max_qerror = qerrors - .iter() - .max_by(|a, b| a.partial_cmp(b).unwrap()) - .unwrap(); - agg_qerror_table.add_row(prettytable::row![ - dbms, - fmt_qerror(median_qerror), - ninf_qerrors, - fmt_qerror(mean_qerror), - fmt_qerror(*min_qerror), - fmt_qerror(*max_qerror), - ]); - } else { - agg_qerror_table - .add_row(prettytable::row![dbms, "N/A", "N/A", "N/A", "N/A", "N/A"]); - } - } - agg_qerror_table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); - agg_qerror_table.printstd(); - - println!(); - println!(" Per-Query Cardinality Info"); - println!(" ==========================="); - for (i, query_id) in query_ids.iter().enumerate() { - println!(" Query {}", query_id); - let mut this_query_cardinfo_table = Table::new(); - this_query_cardinfo_table.set_titles(prettytable::row![ - "DBMS", - "Q-Error", - "Est. Card.", - "True Card." - ]); - for (dbms, cardinfos) in &cardinfo_alldbs { - let this_query_cardinfo = cardinfos.get(i).unwrap(); - this_query_cardinfo_table.add_row(prettytable::row![ - dbms, - this_query_cardinfo.qerror, - this_query_cardinfo.estcard, - this_query_cardinfo.truecard - ]); - } - this_query_cardinfo_table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); - this_query_cardinfo_table.printstd(); - } + .await } } - - Ok(()) } diff --git a/optd-perftest/src/postgres_dbms.rs b/optd-perftest/src/postgres_dbms.rs index 1f9e867c..aae48860 100644 --- a/optd-perftest/src/postgres_dbms.rs +++ b/optd-perftest/src/postgres_dbms.rs @@ -1,6 +1,7 @@ use crate::{ benchmark::Benchmark, cardtest::CardtestRunnerDBMSHelper, + job::{JobConfig, JobKit}, tpch::{TpchConfig, TpchKit}, truecard::{TruecardCache, TruecardGetter}, }; @@ -122,13 +123,14 @@ impl PostgresDBMS { let client = self.connect_to_db(&dbname).await?; match benchmark { Benchmark::Tpch(tpch_config) => self.load_tpch_data(&client, tpch_config).await?, - _ => unimplemented!(), + Benchmark::Job(job_config) => self.load_job_data(&client, job_config).await?, }; File::create(done_fpath).await?; log::debug!("[end] loading benchmark data"); } else { log::debug!("[skip] loading benchmark data"); } + Ok(()) } @@ -150,11 +152,11 @@ impl PostgresDBMS { // load the tables tpch_kit.gen_tables(tpch_config)?; for tbl_fpath in tpch_kit.get_tbl_fpath_iter(tpch_config)? { - Self::copy_from_stdin(client, tbl_fpath).await?; + Self::copy_from_stdin(client, tbl_fpath, "|", "\\").await?; } // load the constraints and indexes - // TODO: constraints are currently broken + // TODO(phw2): constraints are currently broken let sql = fs::read_to_string(tpch_kit.constraints_fpath.to_str().unwrap())?; client.batch_execute(&sql).await?; let sql = fs::read_to_string(tpch_kit.indexes_fpath.to_str().unwrap())?; @@ -168,30 +170,95 @@ impl PostgresDBMS { Ok(()) } + /// Load the JOB data to the database that client is connected to + async fn load_job_data(&self, client: &Client, job_config: &JobConfig) -> anyhow::Result<()> { + // set up TpchKit + let job_kit = JobKit::build(&self.workspace_dpath)?; + + // load the schema + // we need to call make to ensure that the schema file exists + let sql = fs::read_to_string(job_kit.schema_fpath.to_str().unwrap())?; + client.batch_execute(&sql).await?; + + // load the tables + job_kit.download_tables(job_config)?; + for tbl_fpath in job_kit.get_tbl_fpath_iter()? { + Self::copy_from_stdin(client, tbl_fpath, ",", "\\").await?; + } + + // load the indexes + let sql = fs::read_to_string(job_kit.indexes_fpath.to_str().unwrap())?; + client.batch_execute(&sql).await?; + + // create stats + // you need to do VACUUM FULL ANALYZE and not just ANALYZE to make sure the stats are created in a deterministic way + // this is standard practice for postgres benchmarking + client.query("VACUUM FULL ANALYZE", &[]).await?; + + Ok(()) + } + /// Load a file into Postgres by sending the bytes through the network /// Unlike COPY ... FROM, COPY ... FROM STDIN works even if the Postgres process is on another machine or container async fn copy_from_stdin>( client: &tokio_postgres::Client, tbl_fpath: P, + delimiter: &str, + escape: &str, ) -> anyhow::Result<()> { - // read file + // Setup let mut file = File::open(&tbl_fpath).await?; - let mut data = Vec::new(); - file.read_to_end(&mut data).await?; - let cursor = Cursor::new(data); - - // run copy from statement - let tbl_name = TpchKit::get_tbl_name_from_tbl_fpath(&tbl_fpath); - let stmt = client - .prepare(&format!( - "COPY {} FROM STDIN WITH (FORMAT csv, DELIMITER '|')", - tbl_name - )) - .await?; - let sink = client.copy_in(&stmt).await?; - futures::pin_mut!(sink); - sink.as_mut().start_send(cursor)?; - sink.finish().await?; + // Internally, File::read() seems to read at most 2MB at a time, so I set BUFFER_SIZE to be that. + const BUFFER_SIZE: usize = 2 * 1024 * 1024; + let mut extra_bytes_buffer = vec![]; + + // Read the file BUFFER_SIZE at a time, sending a copy statement accordingly. + // BUFFER_SIZE must be < 1GB because sending a single statement that is >1GB in size + // causes Postgres to cancel the transaction. + loop { + // Add the extra bytes from last time and then read from the file to fill the buffer to at most BUFFER_SIZE + let mut buffer = vec![0u8; BUFFER_SIZE]; + let num_extra_bytes = extra_bytes_buffer.len(); + buffer.splice(0..num_extra_bytes, extra_bytes_buffer); + let num_bytes_read = file.read(&mut buffer[num_extra_bytes..]).await?; + let num_bytes_in_buffer = num_extra_bytes + num_bytes_read; + + if num_bytes_in_buffer == 0 { + break; + } + + // Truncate here to handle when file.read() encounters the end of the file. + buffer.truncate(num_bytes_in_buffer); + + // Find the last newline in the buffer. Copy the extra data out and truncate the buffer. + extra_bytes_buffer = vec![]; + let last_newline_idx = buffer.iter().rposition(|&x| x == b'\n'); + // It's possible that the buffer doesn't contain any newlines if it only has the very last line of the file. + // In that case, we'll just assume it's the last line of the file and not truncate the buffer. + // It's also possible that we have a line that's too long, but it's easier to just let Postgres throw an + // error if this happens. + if let Some(last_newline_idx) = last_newline_idx { + let extra_bytes_start_idx = last_newline_idx + 1; + // Since we truncated buffer earlier, &buffer[extra_bytes_start_idx..] will not contain + // any bytes *not* in the file. + extra_bytes_buffer.extend_from_slice(&buffer[extra_bytes_start_idx..]); + buffer.truncate(extra_bytes_start_idx); + } + + // Execute a COPY FROM STDIN statement with the buffer + let cursor = Cursor::new(buffer); + let tbl_name = TpchKit::get_tbl_name_from_tbl_fpath(&tbl_fpath); + let stmt = client + .prepare(&format!( + "COPY {} FROM STDIN WITH (FORMAT csv, DELIMITER '{}', ESCAPE '{}')", + tbl_name, delimiter, escape + )) + .await?; + let sink = client.copy_in(&stmt).await?; + futures::pin_mut!(sink); + sink.as_mut().start_send(cursor)?; + sink.finish().await?; + } Ok(()) } @@ -242,11 +309,11 @@ impl PostgresDBMS { let mut truecards = vec![]; for (query_id, sql_fpath) in tpch_kit.get_sql_fpath_ordered_iter(tpch_config)? { let sql = fs::read_to_string(sql_fpath)?; - let truecard = match truecard_cache.get_truecard(dbname, query_id) { + let truecard = match truecard_cache.get_truecard(dbname, &query_id) { Some(truecard) => truecard, None => { let truecard = self.eval_query_truecard(client, &sql).await?; - truecard_cache.insert_truecard(dbname, query_id, truecard); + truecard_cache.insert_truecard(dbname, &query_id, truecard); truecard } }; @@ -297,8 +364,8 @@ impl CardtestRunnerDBMSHelper for PostgresDBMS { let dbname = benchmark.get_dbname(); let client = self.connect_to_db(&dbname).await?; match benchmark { - Benchmark::Test => unimplemented!(), Benchmark::Tpch(tpch_config) => self.eval_tpch_estcards(&client, tpch_config).await, + Benchmark::Job(_job_config) => unimplemented!(), } } } @@ -322,11 +389,11 @@ impl TruecardGetter for PostgresDBMS { let client = self.connect_to_db(&dbname).await?; // all "eval_*" functions should add the truecards they find to the truecard cache match benchmark { - Benchmark::Test => unimplemented!(), Benchmark::Tpch(tpch_config) => { self.eval_tpch_truecards(&client, tpch_config, &dbname, &mut truecard_cache) .await } + Benchmark::Job(_job_config) => unimplemented!(), } // note that truecard_cache will save itself when it goes out of scope } diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index db091008..7ca140ee 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -19,7 +19,7 @@ pub struct TpchConfig { pub dbms: String, pub scale_factor: f64, pub seed: i32, - pub query_ids: Vec, + pub query_ids: Vec, } impl Display for TpchConfig { @@ -27,8 +27,8 @@ impl Display for TpchConfig { // Use write! macro to write formatted string to `f` write!( f, - "TpchConfig(scale_factor={}, seed={})", - self.scale_factor, self.seed + "TpchConfig(dbms={}, scale_factor={}, seed={}, query_ids={:?})", + self.dbms, self.scale_factor, self.seed, self.query_ids ) } } @@ -222,7 +222,7 @@ impl TpchKit { pub fn get_sql_fpath_ordered_iter( &self, tpch_config: &TpchConfig, - ) -> io::Result> { + ) -> io::Result> { let this_genned_queries_dpath = self.get_this_genned_queries_dpath(tpch_config); let sql_fpath_ordered_iter = tpch_config @@ -230,10 +230,9 @@ impl TpchKit { .clone() .into_iter() .map(move |query_id| { - ( - query_id, - this_genned_queries_dpath.join(format!("{}.sql", query_id)), - ) + let this_genned_query_fpath = + this_genned_queries_dpath.join(format!("{}.sql", &query_id)); + (query_id, this_genned_query_fpath) }); Ok(sql_fpath_ordered_iter) } diff --git a/optd-perftest/src/truecard.rs b/optd-perftest/src/truecard.rs index c975ef18..4ca9ce9e 100644 --- a/optd-perftest/src/truecard.rs +++ b/optd-perftest/src/truecard.rs @@ -19,7 +19,7 @@ pub trait TruecardGetter { /// A cache that gets persisted to disk for the true cardinalities of all queries of all benchmarks pub struct TruecardCache { truecard_cache_fpath: PathBuf, - cache: HashMap>, + cache: HashMap>, } impl TruecardCache { @@ -38,7 +38,7 @@ impl TruecardCache { }) } - pub fn insert_truecard(&mut self, dbname: &str, query_id: u32, truecard: usize) { + pub fn insert_truecard(&mut self, dbname: &str, query_id: &str, truecard: usize) { let db_cache = match self.cache.get_mut(dbname) { Some(db_cache) => db_cache, None => { @@ -46,13 +46,13 @@ impl TruecardCache { self.cache.get_mut(dbname).unwrap() } }; - db_cache.insert(query_id, truecard); + db_cache.insert(String::from(query_id), truecard); } - pub fn get_truecard(&self, dbname: &str, query_id: u32) -> Option { + pub fn get_truecard(&self, dbname: &str, query_id: &str) -> Option { self.cache .get(dbname) - .and_then(|db_cache| db_cache.get(&query_id).copied()) + .and_then(|db_cache| db_cache.get(query_id).copied()) } pub fn save(&self) -> anyhow::Result<()> { diff --git a/optd-perftest/tests/cardtest_integration.rs b/optd-perftest/tests/cardtest_integration.rs index c1afc9ae..d103e7d4 100644 --- a/optd-perftest/tests/cardtest_integration.rs +++ b/optd-perftest/tests/cardtest_integration.rs @@ -12,15 +12,18 @@ mod tests { /// Make sure Postgres is running before this test is run /// The reason I don't start Postgres automatically is because everyone has a different /// preferred way of starting it (in Docker container, with Mac app, custom build, etc.) - #[test] - fn cli_run_cardtest_twice() { + /// It's important to exercise all the different benchmarks to make sure their respective + /// kits, loading logic, and execution logic are sound. + #[test_case::test_case("tpch")] + // #[test_case::test_case("job")] + fn cli_run_cardtest_twice(benchmark_name: &str) { // perform cleanup (clear workspace) let workspace_dpath = shell::parse_pathstr(WORKSPACE).unwrap(); shell::make_into_empty_dir(&workspace_dpath).unwrap(); // run command twice for i in 1..=2 { - let mut cmd = create_cardtest_run_cmd(false); + let mut cmd = create_cardtest_run_cmd(benchmark_name, false); let output = cmd.output().unwrap(); assert!( output.status.success(), @@ -34,13 +37,15 @@ mod tests { fs::remove_dir_all(&workspace_dpath).unwrap(); } - fn create_cardtest_run_cmd(debug_print: bool) -> Command { + fn create_cardtest_run_cmd(benchmark_name: &str, debug_print: bool) -> Command { let mut cmd = Command::cargo_bin("optd-perftest").unwrap(); cmd.current_dir(".."); cmd.args([ "--workspace", WORKSPACE, "cardtest", + "--benchmark-name", + benchmark_name, // make sure scale factor is low so the test runs fast "--scale-factor", "0.01",