Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 7f87289

Browse files
authored
feat: implement card test for df optimizer (#105)
Implement card test interfaces for Datafusion optimizer. After #104, the cost of each plan node can be displayed using `explain verbose`. Utilizing this feature, we can extract the estimated cardinality of the root plan node. The true cardinality is computed by counting the rows of the result set. Also enable users to select queries to execute in perftest by specifying e.g. `--query-ids 2 3` ![image](https://github.com/cmu-db/optd/assets/52783948/00567813-d10d-4f99-9024-21baf4f5e4f9)
1 parent 7ccacfe commit 7f87289

File tree

7 files changed

+292
-34
lines changed

7 files changed

+292
-34
lines changed

Cargo.lock

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

optd-perftest/Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,17 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
optd-sqlplannertest = { path = "../optd-sqlplannertest" }
9+
datafusion = { version = "32.0.0", features = [
10+
"avro",
11+
"crypto_expressions",
12+
"encoding_expressions",
13+
"regex_expressions",
14+
"unicode_expressions",
15+
"compression",
16+
] }
17+
optd-datafusion-repr = { path = "../optd-datafusion-repr" }
18+
optd-datafusion-bridge = { path = "../optd-datafusion-bridge" }
19+
datafusion-optd-cli = { path = "../datafusion-optd-cli" }
1020
futures = "0.3"
1121
anyhow = { version = "1", features = ["backtrace"] }
1222
async-trait = "0.1"
@@ -25,3 +35,4 @@ clap = { version = "4.5", features = [
2535
] }
2636
log = "0.4"
2737
env_logger = "0.11"
38+
lazy_static = "1.4.0"

optd-perftest/src/cardtest.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ pub trait CardtestRunnerDBHelper {
7777
fn get_name(&self) -> &str;
7878

7979
// The order of queries has to be the same between these two functions.
80-
async fn eval_benchmark_estcards(&self, benchmark: &Benchmark) -> anyhow::Result<Vec<usize>>;
81-
async fn eval_benchmark_truecards(&self, benchmark: &Benchmark) -> anyhow::Result<Vec<usize>>;
80+
async fn eval_benchmark_estcards(
81+
&mut self,
82+
benchmark: &Benchmark,
83+
) -> anyhow::Result<Vec<usize>>;
84+
async fn eval_benchmark_truecards(
85+
&mut self,
86+
benchmark: &Benchmark,
87+
) -> anyhow::Result<Vec<usize>>;
8288
}
Lines changed: 231 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,246 @@
1-
use crate::{benchmark::Benchmark, cardtest::CardtestRunnerDBHelper};
1+
use std::{
2+
fs,
3+
path::{Path, PathBuf},
4+
sync::Arc,
5+
};
6+
7+
use crate::{
8+
benchmark::Benchmark,
9+
cardtest::CardtestRunnerDBHelper,
10+
tpch::{TpchConfig, TpchKit},
11+
};
212
use async_trait::async_trait;
3-
use optd_sqlplannertest::DatafusionDb;
13+
use datafusion::{
14+
arrow::util::display::{ArrayFormatter, FormatOptions},
15+
execution::{
16+
config::SessionConfig,
17+
context::{SessionContext, SessionState},
18+
runtime_env::{RuntimeConfig, RuntimeEnv},
19+
},
20+
sql::{parser::DFParser, sqlparser::dialect::GenericDialect},
21+
};
22+
use datafusion_optd_cli::helper::unescape_input;
23+
use lazy_static::lazy_static;
24+
use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner};
25+
use optd_datafusion_repr::DatafusionOptimizer;
26+
use regex::Regex;
27+
28+
pub struct DatafusionDb {
29+
workspace_dpath: PathBuf,
30+
ctx: SessionContext,
31+
}
432

533
#[async_trait]
634
impl CardtestRunnerDBHelper for DatafusionDb {
735
fn get_name(&self) -> &str {
836
"DataFusion"
937
}
1038

11-
async fn eval_benchmark_truecards(&self, _benchmark: &Benchmark) -> anyhow::Result<Vec<usize>> {
12-
Ok(vec![])
39+
async fn eval_benchmark_estcards(
40+
&mut self,
41+
benchmark: &Benchmark,
42+
) -> anyhow::Result<Vec<usize>> {
43+
self.clear_state().await?;
44+
self.load_benchmark_data(benchmark).await?;
45+
match benchmark {
46+
Benchmark::Test => unimplemented!(),
47+
Benchmark::Tpch(tpch_config) => self.eval_tpch_estcards(tpch_config).await,
48+
}
1349
}
1450

15-
async fn eval_benchmark_estcards(&self, _benchmark: &Benchmark) -> anyhow::Result<Vec<usize>> {
16-
Ok(vec![])
51+
async fn eval_benchmark_truecards(
52+
&mut self,
53+
benchmark: &Benchmark,
54+
) -> anyhow::Result<Vec<usize>> {
55+
self.clear_state().await?;
56+
self.load_benchmark_data(benchmark).await?;
57+
match benchmark {
58+
Benchmark::Test => unimplemented!(),
59+
Benchmark::Tpch(tpch_config) => self.eval_tpch_truecards(tpch_config).await,
60+
}
1761
}
1862
}
1963

20-
// helper functions for ```impl CardtestRunnerDBHelper for DatafusionDb```
21-
// they can't be put in an impl because DatafusionDb is a foreign struct
22-
async fn _eval_query_truecard(db: &DatafusionDb, sql: &str) -> anyhow::Result<usize> {
23-
let rows = db.execute(sql, true).await?;
24-
let num_rows = rows.len();
25-
Ok(num_rows)
26-
}
64+
impl DatafusionDb {
65+
pub async fn new<P: AsRef<Path>>(workspace_dpath: P) -> anyhow::Result<Self> {
66+
Ok(DatafusionDb {
67+
workspace_dpath: workspace_dpath.as_ref().to_path_buf(),
68+
ctx: Self::new_session_ctx().await?,
69+
})
70+
}
71+
72+
/// Reset data and metadata.
73+
async fn clear_state(&mut self) -> anyhow::Result<()> {
74+
self.ctx = Self::new_session_ctx().await?;
75+
Ok(())
76+
}
77+
78+
async fn new_session_ctx() -> anyhow::Result<SessionContext> {
79+
let session_config = SessionConfig::from_env()?.with_information_schema(true);
80+
let rn_config = RuntimeConfig::new();
81+
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
82+
let ctx = {
83+
let mut state =
84+
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
85+
let optimizer = DatafusionOptimizer::new_physical(Arc::new(DatafusionCatalog::new(
86+
state.catalog_list(),
87+
)));
88+
state = state.with_physical_optimizer_rules(vec![]);
89+
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
90+
SessionContext::new_with_state(state)
91+
};
92+
ctx.refresh_catalogs().await?;
93+
Ok(ctx)
94+
}
2795

28-
async fn _eval_query_estcard(db: &DatafusionDb, _sql: &str) -> anyhow::Result<usize> {
29-
let rows = db.execute("EXPLAIN SELECT * FROM t1;", true).await?;
30-
println!("eval_est_card(): rows={:?}", rows);
31-
Ok(12)
96+
async fn execute(&self, sql: &str) -> anyhow::Result<Vec<Vec<String>>> {
97+
let sql = unescape_input(sql)?;
98+
let dialect = Box::new(GenericDialect);
99+
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
100+
let mut result = Vec::new();
101+
for statement in statements {
102+
let df = {
103+
let plan = self.ctx.state().statement_to_plan(statement).await?;
104+
self.ctx.execute_logical_plan(plan).await?
105+
};
106+
107+
let batches = df.collect().await?;
108+
109+
let options = FormatOptions::default();
110+
111+
for batch in batches {
112+
let converters = batch
113+
.columns()
114+
.iter()
115+
.map(|a| ArrayFormatter::try_new(a.as_ref(), &options))
116+
.collect::<Result<Vec<_>, _>>()?;
117+
for row_idx in 0..batch.num_rows() {
118+
let mut row = Vec::with_capacity(batch.num_columns());
119+
for converter in converters.iter() {
120+
let mut buffer = String::with_capacity(8);
121+
converter.value(row_idx).write(&mut buffer)?;
122+
row.push(buffer);
123+
}
124+
result.push(row);
125+
}
126+
}
127+
}
128+
Ok(result)
129+
}
130+
131+
async fn eval_tpch_estcards(&self, tpch_config: &TpchConfig) -> anyhow::Result<Vec<usize>> {
132+
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
133+
tpch_kit.gen_queries(tpch_config)?;
134+
135+
let mut estcards = vec![];
136+
for sql_fpath in tpch_kit.get_sql_fpath_ordered_iter(tpch_config)? {
137+
let sql = fs::read_to_string(sql_fpath)?;
138+
let estcard = self.eval_query_estcard(&sql).await?;
139+
estcards.push(estcard);
140+
}
141+
142+
Ok(estcards)
143+
}
144+
145+
async fn eval_tpch_truecards(&self, tpch_config: &TpchConfig) -> anyhow::Result<Vec<usize>> {
146+
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
147+
tpch_kit.gen_queries(tpch_config)?;
148+
149+
let mut truecards = vec![];
150+
for sql_fpath in tpch_kit.get_sql_fpath_ordered_iter(tpch_config)? {
151+
let sql = fs::read_to_string(sql_fpath)?;
152+
let estcard = self.eval_query_truecard(&sql).await?;
153+
truecards.push(estcard);
154+
}
155+
156+
Ok(truecards)
157+
}
158+
159+
async fn eval_query_truecard(&self, sql: &str) -> anyhow::Result<usize> {
160+
let rows = self.execute(sql).await?;
161+
let num_rows = rows.len();
162+
Ok(num_rows)
163+
}
164+
165+
async fn eval_query_estcard(&self, sql: &str) -> anyhow::Result<usize> {
166+
lazy_static! {
167+
static ref ROW_CNT_RE: Regex = Regex::new(r"row_cnt=(\d+\.\d+)").unwrap();
168+
}
169+
let explains = self.execute(&format!("explain verbose {}", sql)).await?;
170+
// Find first occurrence of row_cnt=... in the output.
171+
let row_cnt = explains
172+
.iter()
173+
.find_map(|explain| {
174+
// First element is task name, second is the actual explain output.
175+
assert!(explain.len() == 2);
176+
let explain = &explain[1];
177+
if let Some(caps) = ROW_CNT_RE.captures(explain) {
178+
caps.get(1)
179+
.map(|row_cnt| row_cnt.as_str().parse::<f32>().unwrap() as usize)
180+
} else {
181+
None
182+
}
183+
})
184+
.unwrap();
185+
Ok(row_cnt)
186+
}
187+
188+
async fn load_benchmark_data(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> {
189+
match benchmark {
190+
Benchmark::Tpch(tpch_config) => self.load_tpch_data(tpch_config).await,
191+
_ => unimplemented!(),
192+
}
193+
}
194+
195+
async fn load_tpch_data(&mut self, tpch_config: &TpchConfig) -> anyhow::Result<()> {
196+
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
197+
tpch_kit.gen_tables(tpch_config)?;
198+
199+
// Create the tables.
200+
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
201+
let ddls = ddls
202+
.split(';')
203+
.map(|s| s.trim())
204+
.filter(|s| !s.is_empty())
205+
.collect::<Vec<_>>();
206+
for ddl in ddls {
207+
self.execute(ddl).await?;
208+
}
209+
210+
// Load the data by creating an external table first and copying the data to real tables.
211+
let tbl_fpath_iter = tpch_kit.get_tbl_fpath_iter(tpch_config).unwrap();
212+
for tbl_fpath in tbl_fpath_iter {
213+
let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap();
214+
self.execute(&format!(
215+
"create external table {}_tbl stored as csv delimiter '|' location '{}';",
216+
tbl_name,
217+
tbl_fpath.to_str().unwrap()
218+
))
219+
.await?;
220+
221+
// Get the number of columns of this table.
222+
let schema = self
223+
.ctx
224+
.catalog("datafusion")
225+
.unwrap()
226+
.schema("public")
227+
.unwrap()
228+
.table(tbl_name)
229+
.await
230+
.unwrap()
231+
.schema();
232+
let projection_list = (1..=schema.fields().len())
233+
.map(|i| format!("column_{}", i))
234+
.collect::<Vec<_>>()
235+
.join(", ");
236+
self.execute(&format!(
237+
"insert into {} select {} from {}_tbl;",
238+
tbl_name, projection_list, tbl_name,
239+
))
240+
.await?;
241+
}
242+
Ok(())
243+
}
32244
}
245+
246+
unsafe impl Send for DatafusionDb {}

optd-perftest/src/main.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use postgres_db::PostgresDb;
66

77
use crate::{
88
benchmark::Benchmark,
9+
datafusion_db_cardtest::DatafusionDb,
910
tpch::{TpchConfig, TPCH_KIT_POSTGRES},
1011
};
1112

@@ -37,6 +38,9 @@ enum Commands {
3738
#[arg(long)]
3839
#[clap(default_value = "15721")]
3940
seed: i32,
41+
#[arg(long)]
42+
#[clap(value_delimiter = ' ', num_args = 1..)]
43+
query_ids: Vec<u32>,
4044
},
4145
}
4246

@@ -50,24 +54,30 @@ async fn main() -> anyhow::Result<()> {
5054
fs::create_dir(&workspace_dpath)?;
5155
}
5256

53-
match &cli.command {
54-
Commands::Cardtest { scale_factor, seed } => {
57+
match cli.command {
58+
Commands::Cardtest {
59+
scale_factor,
60+
seed,
61+
query_ids,
62+
} => {
5563
let tpch_config = TpchConfig {
5664
database: String::from(TPCH_KIT_POSTGRES),
57-
scale_factor: *scale_factor,
58-
seed: *seed,
65+
scale_factor,
66+
seed,
67+
query_ids,
5968
};
6069
cardtest(&workspace_dpath, tpch_config).await
6170
}
6271
}
6372
}
6473

65-
async fn cardtest<P: AsRef<Path>>(
74+
async fn cardtest<P: AsRef<Path> + Clone>(
6675
workspace_dpath: P,
6776
tpch_config: TpchConfig,
6877
) -> anyhow::Result<()> {
69-
let pg_db = PostgresDb::new(workspace_dpath);
70-
let databases: Vec<Box<dyn CardtestRunnerDBHelper>> = vec![Box::new(pg_db)];
78+
let pg_db = PostgresDb::new(workspace_dpath.clone());
79+
let df_db = DatafusionDb::new(workspace_dpath).await?;
80+
let databases: Vec<Box<dyn CardtestRunnerDBHelper>> = vec![Box::new(pg_db), Box::new(df_db)];
7181

7282
let tpch_benchmark = Benchmark::Tpch(tpch_config.clone());
7383
let mut cardtest_runner = CardtestRunner::new(databases).await?;

0 commit comments

Comments
 (0)