Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 9380a7f

Browse files
feat: JOB kit (#157)
**Summary**: Implemented the "kit" (tables and queries) for JOB **Demo**: The tables and queries in the workspace. ![Screenshot 2024-04-13 at 15 30 22](https://github.com/cmu-db/optd/assets/20631215/46ee6f67-eb69-4fef-8656-6bc46057bcb6) **Details**: * I used a personal fork of [join-order-benchmark](https://github.com/gregrahn/join-order-benchmark) in case we need to change it. * I factored `clonepull_repo()` out into a standalone helper function. * I made `run_command_with_status_check_in_dir()` more ergonomic as well with a refactoring.
1 parent c288d6f commit 9380a7f

File tree

5 files changed

+178
-31
lines changed

5 files changed

+178
-31
lines changed

optd-datafusion-repr/src/cost/base_cost/filter.rs

-1
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,6 @@ mod tests {
748748
));
749749
let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15)));
750750
let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0));
751-
// TODO(phw2): make column_refs a function
752751
let column_refs = vec![ColumnRef::BaseTableColumnRef {
753752
table: String::from(TABLE1_NAME),
754753
col_idx: 0,

optd-perftest/src/job.rs

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/// A wrapper around job-kit
2+
use serde::{Deserialize, Serialize};
3+
4+
use crate::shell;
5+
use std::fmt::{self, Display, Formatter};
6+
use std::fs;
7+
use std::fs::File;
8+
use std::io;
9+
use std::path::{Path, PathBuf};
10+
11+
const JOB_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/job-kit.git";
12+
const JOB_TABLES_URL: &str = "https://homepages.cwi.nl/~boncz/job/imdb.tgz";
13+
14+
#[derive(Clone, Debug, Serialize, Deserialize)]
15+
pub struct JobConfig {
16+
pub query_ids: Vec<u32>,
17+
}
18+
19+
impl Display for JobConfig {
20+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
21+
// Use write! macro to write formatted string to `f`
22+
write!(f, "JobConfig(query_ids={:?})", self.query_ids,)
23+
}
24+
}
25+
26+
/// Provides many helper functions for running a JOB workload.
27+
/// It does not actually execute the queries as it is meant to be DBMS-agnostic.
28+
/// Is essentially a wrapper around the job-kit repo.
29+
/// Since it's conceptually a wrapper around the repo, I chose _not_ to make
30+
/// JobConfig an initialization parameter.
31+
pub struct JobKit {
32+
_workspace_dpath: PathBuf,
33+
34+
// cache these paths so we don't have to build them multiple times
35+
job_dpath: PathBuf,
36+
job_kit_repo_dpath: PathBuf,
37+
downloaded_tables_dpath: PathBuf,
38+
queries_dpath: PathBuf,
39+
pub schema_fpath: PathBuf,
40+
pub indexes_fpath: PathBuf,
41+
}
42+
43+
impl JobKit {
44+
pub fn build<P: AsRef<Path>>(workspace_dpath: P) -> io::Result<Self> {
45+
log::debug!("[start] building JobKit");
46+
47+
// build paths, sometimes creating them if they don't exist
48+
let workspace_dpath = workspace_dpath.as_ref().to_path_buf();
49+
let job_dpath = workspace_dpath.join("job");
50+
if !job_dpath.exists() {
51+
fs::create_dir(&job_dpath)?;
52+
}
53+
let job_kit_repo_dpath = job_dpath.join("job-kit");
54+
let queries_dpath = job_kit_repo_dpath.join("queries");
55+
let downloaded_tables_dpath = job_dpath.join("downloaded_tables");
56+
if !downloaded_tables_dpath.exists() {
57+
fs::create_dir(&downloaded_tables_dpath)?;
58+
}
59+
let schema_fpath = job_kit_repo_dpath.join("schema.sql");
60+
let indexes_fpath = job_kit_repo_dpath.join("fkindexes.sql");
61+
62+
// create Self
63+
let kit = JobKit {
64+
_workspace_dpath: workspace_dpath,
65+
job_dpath,
66+
job_kit_repo_dpath,
67+
queries_dpath,
68+
downloaded_tables_dpath,
69+
schema_fpath,
70+
indexes_fpath,
71+
};
72+
73+
// setup
74+
shell::clonepull_repo(JOB_KIT_REPO_URL, &kit.job_kit_repo_dpath)?;
75+
76+
log::debug!("[end] building TpchKit");
77+
Ok(kit)
78+
}
79+
80+
/// Download the .csv files for all tables of JOB
81+
pub fn download_tables(&self, job_config: &JobConfig) -> io::Result<()> {
82+
let done_fpath = self.downloaded_tables_dpath.join("download_tables_done");
83+
if !done_fpath.exists() {
84+
log::debug!("[start] downloading tables for {}", job_config);
85+
// Instructions are from https://cedardb.com/docs/guides/example_datasets/job/, not from the job-kit repo.
86+
shell::run_command_with_status_check_in_dir(
87+
&format!("curl -O {JOB_TABLES_URL}"),
88+
&self.job_dpath,
89+
)?;
90+
shell::make_into_empty_dir(&self.downloaded_tables_dpath)?;
91+
shell::run_command_with_status_check_in_dir(
92+
"tar -zxvf ../imdb.tgz",
93+
&self.downloaded_tables_dpath,
94+
)?;
95+
shell::run_command_with_status_check_in_dir("rm imdb.tgz", &self.job_dpath)?;
96+
File::create(done_fpath)?;
97+
log::debug!("[end] downloading tables for {}", job_config);
98+
} else {
99+
log::debug!("[skip] downloading tables for {}", job_config);
100+
}
101+
Ok(())
102+
}
103+
104+
/// Convert a tbl_fpath into the table name
105+
pub fn get_tbl_name_from_tbl_fpath<P: AsRef<Path>>(tbl_fpath: P) -> String {
106+
tbl_fpath
107+
.as_ref()
108+
.file_stem()
109+
.unwrap()
110+
.to_str()
111+
.unwrap()
112+
.to_string()
113+
}
114+
115+
/// Get an iterator through all generated .tbl files of a given config
116+
pub fn get_tbl_fpath_iter(&self) -> io::Result<impl Iterator<Item = PathBuf>> {
117+
let dirent_iter = fs::read_dir(&self.downloaded_tables_dpath)?;
118+
// all results/options are fine to be unwrapped except for path.extension() because that could
119+
// return None in various cases
120+
let path_iter = dirent_iter.map(|dirent| dirent.unwrap().path());
121+
let tbl_fpath_iter = path_iter
122+
.filter(|path| path.extension().map(|ext| ext.to_str().unwrap()) == Some("csv"));
123+
Ok(tbl_fpath_iter)
124+
}
125+
126+
/// Get an iterator through all generated .sql files _in order_ of a given config
127+
/// It's important to iterate _in order_ due to the interface of CardtestRunnerDBMSHelper
128+
pub fn get_sql_fpath_ordered_iter(
129+
&self,
130+
job_config: &JobConfig,
131+
) -> io::Result<impl Iterator<Item = (u32, PathBuf)>> {
132+
let queries_dpath = self.queries_dpath.clone();
133+
let sql_fpath_ordered_iter = job_config
134+
.query_ids
135+
.clone()
136+
.into_iter()
137+
.map(move |query_id| (query_id, queries_dpath.join(format!("{}.sql", query_id))));
138+
Ok(sql_fpath_ordered_iter)
139+
}
140+
}

optd-perftest/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod benchmark;
22
pub mod cardtest;
33
mod datafusion_dbms;
4+
pub mod job;
45
mod postgres_dbms;
56
pub mod shell;
67
pub mod tpch;

optd-perftest/src/shell.rs

+30-1
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,20 @@ use std::{fs, io};
66
/// Runs a command, exiting the program immediately if the command fails
77
pub fn run_command_with_status_check(cmd_str: &str) -> io::Result<Output> {
88
// we need to bind it to some arbitrary type that implements AsRef<Path>. I just chose &Path
9-
run_command_with_status_check_in_dir::<&Path>(cmd_str, None)
9+
run_command_with_status_check_core::<&Path>(cmd_str, None)
1010
}
1111

1212
/// Runs a command in a directory, exiting the program immediately if the command fails
1313
pub fn run_command_with_status_check_in_dir<P: AsRef<Path>>(
14+
cmd_str: &str,
15+
in_path: P,
16+
) -> io::Result<Output> {
17+
run_command_with_status_check_core::<P>(cmd_str, Some(in_path))
18+
}
19+
20+
/// This function exposes all the different ways to run a command, but the interface is not ergonomic.
21+
/// The ergonomic wrappers above are a workaround for Rust not having default values on parameters.
22+
pub fn run_command_with_status_check_core<P: AsRef<Path>>(
1423
cmd_str: &str,
1524
in_path: Option<P>,
1625
) -> io::Result<Output> {
@@ -79,3 +88,23 @@ pub fn parse_pathstr(pathstr: &str) -> io::Result<PathBuf> {
7988
};
8089
Ok(path)
8190
}
91+
92+
/// Get a repo to its latest state by either cloning or pulling
93+
pub fn clonepull_repo<P: AsRef<Path>>(repo_url: &str, repo_dpath: P) -> io::Result<()> {
94+
if !repo_dpath.as_ref().exists() {
95+
log::debug!("[start] cloning {} repo", repo_url);
96+
run_command_with_status_check(&format!(
97+
"git clone {} {}",
98+
repo_url,
99+
repo_dpath.as_ref().to_str().unwrap()
100+
))?;
101+
log::debug!("[end] cloning {} repo", repo_url);
102+
} else {
103+
log::debug!("[skip] cloning {} repo", repo_url);
104+
}
105+
log::debug!("[start] pulling latest {} repo", repo_url);
106+
run_command_with_status_check_in_dir("git pull", &repo_dpath)?;
107+
log::debug!("[end] pulling latest {} repo", repo_url);
108+
// make sure to do this so that get_optd_root() doesn't break
109+
Ok(())
110+
}

optd-perftest/src/tpch.rs

+7-29
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
/// A wrapper around tpch-kit
12
use serde::{Deserialize, Serialize};
23

34
use crate::shell;
4-
/// A wrapper around tpch-kit (https://github.com/gregrahn/tpch-kit)
55
use std::env;
66
use std::env::consts::OS;
77
use std::fmt::{self, Display, Formatter};
@@ -53,7 +53,6 @@ pub struct TpchKit {
5353
pub indexes_fpath: PathBuf,
5454
}
5555

56-
/// I keep the same conventions for these methods as I do for PostgresDBMS
5756
impl TpchKit {
5857
pub fn build<P: AsRef<Path>>(workspace_dpath: P) -> io::Result<Self> {
5958
log::debug!("[start] building TpchKit");
@@ -93,44 +92,23 @@ impl TpchKit {
9392
indexes_fpath,
9493
};
9594

96-
// set envvars (DSS_PATH can change so we don't set it now)
95+
// setup
9796
env::set_var("DSS_CONFIG", kit.dbgen_dpath.to_str().unwrap());
9897
env::set_var("DSS_QUERY", kit.queries_dpath.to_str().unwrap());
99-
100-
// do setup after creating kit
101-
kit.clonepull_tpch_kit_repo()?;
98+
shell::clonepull_repo(TPCH_KIT_REPO_URL, &kit.tpch_kit_repo_dpath)?;
10299

103100
log::debug!("[end] building TpchKit");
104101
Ok(kit)
105102
}
106103

107-
fn clonepull_tpch_kit_repo(&self) -> io::Result<()> {
108-
if !self.tpch_kit_repo_dpath.exists() {
109-
log::debug!("[start] cloning tpch-kit repo");
110-
shell::run_command_with_status_check(&format!(
111-
"git clone {} {}",
112-
TPCH_KIT_REPO_URL,
113-
self.tpch_kit_repo_dpath.to_str().unwrap()
114-
))?;
115-
log::debug!("[end] cloning tpch-kit repo");
116-
} else {
117-
log::debug!("[skip] cloning tpch-kit repo");
118-
}
119-
log::debug!("[start] pulling latest tpch-kit repo");
120-
shell::run_command_with_status_check_in_dir("git pull", Some(&self.tpch_kit_repo_dpath))?;
121-
log::debug!("[end] pulling latest tpch-kit repo");
122-
// make sure to do this so that get_optd_root() doesn't break
123-
Ok(())
124-
}
125-
126104
pub fn make(&self, dbms: &str) -> io::Result<()> {
127105
log::debug!("[start] building dbgen");
128106
// we need to call "make clean" because we might have called make earlier with
129107
// a different dbms
130-
shell::run_command_with_status_check_in_dir("make clean", Some(&self.dbgen_dpath))?;
108+
shell::run_command_with_status_check_in_dir("make clean", &self.dbgen_dpath)?;
131109
shell::run_command_with_status_check_in_dir(
132110
&format!("make MACHINE={} DATABASE={}", TpchKit::get_machine(), dbms),
133-
Some(&self.dbgen_dpath),
111+
&self.dbgen_dpath,
134112
)?;
135113
log::debug!("[end] building dbgen");
136114
Ok(())
@@ -156,7 +134,7 @@ impl TpchKit {
156134
log::debug!("[start] generating tables for {}", tpch_config);
157135
shell::run_command_with_status_check_in_dir(
158136
&format!("./dbgen -s{}", tpch_config.scale_factor),
159-
Some(&self.dbgen_dpath),
137+
&self.dbgen_dpath,
160138
)?;
161139
File::create(done_fpath)?;
162140
log::debug!("[end] generating tables for {}", tpch_config);
@@ -181,7 +159,7 @@ impl TpchKit {
181159
"./qgen -s{} -r{} {}",
182160
tpch_config.scale_factor, tpch_config.seed, query_i
183161
),
184-
Some(&self.dbgen_dpath),
162+
&self.dbgen_dpath,
185163
)?;
186164
let this_genned_queries_fpath =
187165
this_genned_queries_dpath.join(format!("{}.sql", query_i));

0 commit comments

Comments
 (0)