Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 413c5ef

Browse files
feat: load tpch into postgres (#95)
**Summary**: Code for installing/initializing/running Postgres as well as loading TPC-H into Postgres. **Demo**: TPC-H tables and queries are already generated. In the first run, we load the TPC-H data into Postgres. In the second run, we just use the already loaded `pgdata/`. https://github.com/cmu-db/optd/assets/20631215/a412185c-7c61-436f-9232-2b0d61863c43 **Details**: * Postgres installation/initialization/running is all cached and only done if necessary * Loading benchmarks is cached as well and is based on the configuration of the benchmark _only if the benchmark is read-only_ * As before, caching logic is atomic * Benchmarks and benchmark configs are extensible, including the caching logic (via `get_strid()`)
1 parent cb6a205 commit 413c5ef

12 files changed

+499
-202
lines changed

Diff for: .gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@
55
.history
66
optd-perftest/**/genned_tables
77
optd-perftest/**/genned_queries
8-
optd-perftest/**/tpch-kit
8+
optd-perftest/**/tpch-kit
9+
optd-perftest/**/pgdata
10+
optd-perftest/**/postgres_log

Diff for: Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: optd-perftest/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ tokio = { version = "1.24", features = [
1717
"sync",
1818
"parking_lot",
1919
] }
20+
shlex = "1.3"

Diff for: optd-perftest/src/benchmark.rs

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::tpch::TpchConfig;
2+
3+
pub enum Benchmark {
4+
Test,
5+
Tpch(TpchConfig),
6+
}
7+
8+
impl Benchmark {
9+
pub fn get_strid(&self) -> String {
10+
match self {
11+
Self::Test => String::from("test"),
12+
Self::Tpch(tpch_cfg) => format!("tpch_{}", tpch_cfg.get_strid()),
13+
}
14+
}
15+
16+
pub fn is_readonly(&self) -> bool {
17+
match self {
18+
Self::Test => true,
19+
Self::Tpch(_) => true,
20+
}
21+
}
22+
}

Diff for: optd-perftest/src/cardtest.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use anyhow::{self};
22
use async_trait::async_trait;
33

4+
use crate::benchmark::Benchmark;
5+
46
/// This struct performs cardinality testing across one or more databases.
57
/// Another design would be for the CardtestRunnerDBHelper trait to expose a function
68
/// to evaluate the Q-error. However, I chose not to do this design for reasons
@@ -10,10 +12,6 @@ pub struct CardtestRunner {
1012
pub databases: Vec<Box<dyn CardtestRunnerDBHelper>>,
1113
}
1214

13-
pub enum Benchmark {
14-
Test,
15-
}
16-
1715
impl CardtestRunner {
1816
pub async fn new(databases: Vec<Box<dyn CardtestRunnerDBHelper>>) -> anyhow::Result<Self> {
1917
Ok(CardtestRunner { databases })

Diff for: optd-perftest/src/datafusion_db_cardtest.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::cardtest::{Benchmark, CardtestRunnerDBHelper};
1+
use crate::{benchmark::Benchmark, cardtest::CardtestRunnerDBHelper};
22
use async_trait::async_trait;
33
use optd_sqlplannertest::DatafusionDb;
44

@@ -14,6 +14,7 @@ impl CardtestRunnerDBHelper for DatafusionDb {
1414
self.execute("CREATE TABLE t1 (c1 INT);", true).await?;
1515
self.execute("INSERT INTO t1 VALUES (0);", true).await?;
1616
}
17+
_ => unimplemented!(),
1718
};
1819
Ok(())
1920
}

Diff for: optd-perftest/src/main.rs

+20-11
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,37 @@ use optd_sqlplannertest::DatafusionDb;
44
use postgres_db::PostgresDb;
55

66
use crate::{
7-
cardtest::Benchmark,
8-
tpch_kit::{TpchKit, TPCH_KIT_POSTGRES},
7+
benchmark::Benchmark,
8+
tpch::{TpchConfig, TpchKit, TPCH_KIT_POSTGRES},
99
};
1010

11+
mod benchmark;
1112
mod cardtest;
12-
mod cmd;
1313
mod datafusion_db_cardtest;
1414
mod postgres_db;
15-
mod tpch_kit;
15+
mod shell;
16+
mod tpch;
1617

1718
#[tokio::main]
1819
async fn main() -> Result<()> {
19-
let databases: Vec<Box<dyn CardtestRunnerDBHelper>> = vec![
20-
Box::new(PostgresDb::new().await?),
21-
Box::new(DatafusionDb::new().await?),
22-
];
20+
let pg_db = PostgresDb::build(true).await?;
21+
let tpch_cfg = TpchConfig {
22+
database: String::from(TPCH_KIT_POSTGRES),
23+
scale_factor: 1,
24+
seed: 15721,
25+
};
26+
let tpch_benchmark = Benchmark::Tpch(tpch_cfg.clone());
27+
pg_db.load_database(&tpch_benchmark).await?;
28+
if true {
29+
return Ok(());
30+
}
31+
let df_db = DatafusionDb::new().await?;
32+
let databases: Vec<Box<dyn CardtestRunnerDBHelper>> = vec![Box::new(pg_db), Box::new(df_db)];
2333
let cardtest_runner = CardtestRunner::new(databases).await?;
2434
cardtest_runner.load_databases(Benchmark::Test).await?;
2535
let qerrors = cardtest_runner.eval_qerrors("SELECT * FROM t1;").await?;
2636
println!("qerrors: {:?}", qerrors);
27-
let kit = TpchKit::build(true).unwrap();
28-
kit.gen_tables(TPCH_KIT_POSTGRES, 1)?;
29-
kit.gen_queries(TPCH_KIT_POSTGRES, 1, 15721)?;
37+
let tpch_kit = TpchKit::build(true)?;
38+
tpch_kit.gen_queries(&tpch_cfg)?;
3039
Ok(())
3140
}

Diff for: optd-perftest/src/postgres_db.rs

+221-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,205 @@
1-
use crate::cardtest::{Benchmark, CardtestRunnerDBHelper};
1+
use crate::{
2+
benchmark::Benchmark,
3+
cardtest::CardtestRunnerDBHelper,
4+
shell,
5+
tpch::{TpchConfig, TpchKit},
6+
};
27
use anyhow::Result;
38
use async_trait::async_trait;
9+
use std::{
10+
env::{self, consts::OS},
11+
fs::{self, File},
12+
path::{Path, PathBuf},
13+
process::Command,
14+
};
415

5-
pub struct PostgresDb {}
16+
const OPTD_DB_NAME: &str = "optd";
617

18+
pub struct PostgresDb {
19+
verbose: bool,
20+
21+
// cache these paths so we don't have to build them multiple times
22+
_postgres_db_dpath: PathBuf,
23+
pgdata_dpath: PathBuf,
24+
log_fpath: PathBuf,
25+
}
26+
27+
/// Conventions I keep for methods of this class:
28+
/// - Functions should be idempotent. For instance, start_postgres() should not fail if Postgres is already running
29+
/// - Stop and start functions should be separate
30+
/// - Setup should be done in build() unless it requires more information (like benchmark)
731
impl PostgresDb {
8-
pub async fn new() -> Result<Self> {
9-
Ok(PostgresDb {})
32+
pub async fn build(verbose: bool) -> Result<Self> {
33+
// build paths, sometimes creating them if they don't exist
34+
let curr_dpath = env::current_dir()?;
35+
let postgres_db_dpath = Path::new(file!())
36+
.parent()
37+
.unwrap()
38+
.join("postgres_db")
39+
.to_path_buf();
40+
let postgres_db_dpath = curr_dpath.join(postgres_db_dpath); // make it absolute
41+
if !postgres_db_dpath.exists() {
42+
fs::create_dir(&postgres_db_dpath)?;
43+
}
44+
let pgdata_dpath = postgres_db_dpath.join("pgdata");
45+
let log_fpath = postgres_db_dpath.join("postgres_log");
46+
47+
// create Self
48+
let db = PostgresDb {
49+
verbose,
50+
_postgres_db_dpath: postgres_db_dpath,
51+
pgdata_dpath,
52+
log_fpath,
53+
};
54+
55+
// (re)start postgres
56+
db.install_postgres().await?;
57+
db.init_pgdata().await?;
58+
db.start_postgres().await?;
59+
60+
Ok(db)
61+
}
62+
63+
/// Installs an up-to-date version of Postgres using the OS's package manager
64+
async fn install_postgres(&self) -> Result<()> {
65+
match OS {
66+
"macos" => {
67+
if self.verbose {
68+
println!("updating and upgrading brew...");
69+
}
70+
shell::run_command_with_status_check("brew update")?;
71+
shell::run_command_with_status_check("brew upgrade")?;
72+
73+
if self.verbose {
74+
println!("installing postgresql...");
75+
}
76+
shell::run_command_with_status_check("brew install postgresql")?;
77+
}
78+
_ => unimplemented!(),
79+
};
80+
Ok(())
81+
}
82+
83+
/// Remove the pgdata dir, making sure to stop a running Postgres process if there is one
84+
/// If there is a Postgres process running on pgdata, it's important to stop it to avoid
85+
/// corrupting it (not stopping it leads to lots of weird behavior)
86+
async fn remove_pgdata(&self) -> Result<()> {
87+
if PostgresDb::get_is_postgres_running()? {
88+
self.stop_postgres().await?;
89+
}
90+
shell::make_into_empty_dir(&self.pgdata_dpath)?;
91+
Ok(())
92+
}
93+
94+
/// Initializes pgdata_dpath directory if it wasn't already initialized
95+
async fn init_pgdata(&self) -> Result<()> {
96+
let done_fpath = self.pgdata_dpath.join("initdb_done");
97+
if !done_fpath.exists() {
98+
if self.verbose {
99+
println!("running initdb...");
100+
}
101+
shell::make_into_empty_dir(&self.pgdata_dpath)?;
102+
shell::run_command_with_status_check(&format!(
103+
"initdb {}",
104+
self.pgdata_dpath.to_str().unwrap()
105+
))?;
106+
File::create(done_fpath)?;
107+
} else {
108+
#[allow(clippy::collapsible_else_if)]
109+
if self.verbose {
110+
println!("skipped running initdb");
111+
}
112+
}
113+
Ok(())
114+
}
115+
116+
/// Start the Postgres process if it's not already started
117+
/// It will always be started using the pg_ctl binary installed with the package manager
118+
/// It will always be started on port 5432
119+
async fn start_postgres(&self) -> Result<()> {
120+
if !PostgresDb::get_is_postgres_running()? {
121+
if self.verbose {
122+
println!("starting postgres...");
123+
}
124+
shell::run_command_with_status_check(&format!(
125+
"pg_ctl -D{} -l{} start",
126+
self.pgdata_dpath.to_str().unwrap(),
127+
self.log_fpath.to_str().unwrap()
128+
))?;
129+
} else {
130+
#[allow(clippy::collapsible_else_if)]
131+
if self.verbose {
132+
println!("skipped starting postgres");
133+
}
134+
}
135+
136+
Ok(())
137+
}
138+
139+
/// Stop the Postgres process started by start_postgres()
140+
async fn stop_postgres(&self) -> Result<()> {
141+
if PostgresDb::get_is_postgres_running()? {
142+
if self.verbose {
143+
println!("stopping postgres...");
144+
}
145+
shell::run_command_with_status_check(&format!(
146+
"pg_ctl -D{} stop",
147+
self.pgdata_dpath.to_str().unwrap()
148+
))?;
149+
} else {
150+
#[allow(clippy::collapsible_else_if)]
151+
if self.verbose {
152+
println!("skipped stopping postgres");
153+
}
154+
}
155+
156+
Ok(())
157+
}
158+
159+
/// Check whether postgres is running
160+
fn get_is_postgres_running() -> Result<bool> {
161+
Ok(Command::new("pg_isready").output()?.status.success())
162+
}
163+
164+
/// Load the benchmark data without worrying about caching
165+
async fn load_benchmark_data_raw(&self, benchmark: &Benchmark) -> Result<()> {
166+
match benchmark {
167+
Benchmark::Tpch(tpch_cfg) => self.load_tpch_data(tpch_cfg).await?,
168+
_ => unimplemented!(),
169+
};
170+
Ok(())
171+
}
172+
173+
async fn load_tpch_data(&self, tpch_cfg: &TpchConfig) -> Result<()> {
174+
// start from a clean slate
175+
self.remove_pgdata().await?;
176+
// since we deleted pgdata we'll need to re-init it
177+
self.init_pgdata().await?;
178+
// postgres must be started again since remove_pgdata() stops it
179+
self.start_postgres().await?;
180+
// load the schema. createdb should not fail since we just make a fresh pgdata
181+
shell::run_command_with_status_check(&format!("createdb {}", OPTD_DB_NAME))?;
182+
let tpch_kit = TpchKit::build(self.verbose)?;
183+
tpch_kit.gen_tables(tpch_cfg)?;
184+
shell::run_command_with_status_check(&format!(
185+
"psql {} -f {}",
186+
OPTD_DB_NAME,
187+
tpch_kit.schema_fpath.to_str().unwrap()
188+
))?;
189+
let tbl_fpath_iter = tpch_kit.get_tbl_fpath_iter(tpch_cfg).unwrap();
190+
for tbl_fpath in tbl_fpath_iter {
191+
let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap();
192+
let copy_table_cmd = format!(
193+
"\\copy {} from {} csv delimiter '|'",
194+
tbl_name,
195+
tbl_fpath.to_str().unwrap()
196+
);
197+
shell::run_command_with_status_check(&format!(
198+
"psql {} -c \"{}\"",
199+
OPTD_DB_NAME, copy_table_cmd
200+
))?;
201+
}
202+
Ok(())
10203
}
11204
}
12205

@@ -16,7 +209,30 @@ impl CardtestRunnerDBHelper for PostgresDb {
16209
"Postgres"
17210
}
18211

19-
async fn load_database(&self, _benchmark: &Benchmark) -> anyhow::Result<()> {
212+
/// Load the data of a benchmark with parameters
213+
/// As an optimization, if this benchmark only has read-only queries and the
214+
/// data currently loaded was with the same benchmark and parameters, we don't
215+
/// need to load it again
216+
async fn load_database(&self, benchmark: &Benchmark) -> anyhow::Result<()> {
217+
if benchmark.is_readonly() {
218+
let benchmark_strid = benchmark.get_strid();
219+
let done_fname = format!("{}_done", benchmark_strid);
220+
let done_fpath = self.pgdata_dpath.join(done_fname);
221+
if !done_fpath.exists() {
222+
if self.verbose {
223+
println!("loading data for {}...", benchmark_strid);
224+
}
225+
self.load_benchmark_data_raw(benchmark).await?;
226+
File::create(done_fpath)?;
227+
} else {
228+
#[allow(clippy::collapsible_else_if)]
229+
if self.verbose {
230+
println!("skipped loading data for {}", benchmark_strid);
231+
}
232+
}
233+
} else {
234+
self.load_benchmark_data_raw(benchmark).await?
235+
}
20236
Ok(())
21237
}
22238

0 commit comments

Comments
 (0)