Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
feat: can load files > 1GB into postgres (#159)
Browse files Browse the repository at this point in the history
**Summary**: Can now load files \> 1GB in size by chunking files into
2MB chunks for `COPY FROM STDIN`.

**Demo**:
JOB's `cast_info.csv` is 1.3GB and has 36244344 rows (image 1). All
36244344 are loaded successfully (image 2).
![Screenshot 2024-04-14 at 18 06
22](https://github.com/cmu-db/optd/assets/20631215/c97c6649-548e-488a-ae6f-a0f9c458044a)
![Screenshot 2024-04-14 at 18 06
36](https://github.com/cmu-db/optd/assets/20631215/99967024-5631-4e53-a2e1-c7f0088da1f2)

**Details**:
* Postgres does not allow statements that exceed 1GB in size. Since we
previously sent the entire file in a single statement, this caused
Postgres to cancel the transaction.
* This is fixed by chunking files into 2MB chunks and doing a separate
`COPY FROM STDIN` statement for each.
* We also solved another issue in the meantime: we can now load files
that would not fit in memory.
  • Loading branch information
wangpatrick57 authored Apr 15, 2024
1 parent 9380a7f commit a332018
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 162 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions optd-perftest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ statistical = "1.0"
prettytable-rs = "0.10"
serde = "1.0"
serde_json = "1.0"
test-case = "3.3"

[dev_dependencies]
assert_cmd = "2.0"
30 changes: 9 additions & 21 deletions optd-perftest/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::tpch::TpchConfig;
use crate::{job::JobConfig, tpch::TpchConfig};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub enum Benchmark {
#[allow(dead_code)]
Test,
Tpch(TpchConfig),
Job(JobConfig),
}

impl Benchmark {
Expand All @@ -25,40 +24,29 @@ impl Benchmark {
/// lowercase. To resolve the inconsistency, the names output by this function will
/// *not* contain uppercase letters.
pub fn get_dbname(&self) -> String {
let dbname = match self {
Self::Test => String::from("test"),
Self::Tpch(tpch_config) => {
format!("tpch_sf{}", tpch_config.scale_factor)
}
};
let fname = self.get_fname();
// since Postgres names cannot contain periods
let dbname = dbname.replace('.', "point");
let dbname = fname.replace('.', "point");
// due to the weird inconsistency with Postgres (see function comment)
dbname.to_lowercase()
}

/// Use this when you need a unique file name. The rules for file names are different from the
/// rules for database names, so this is a different function
/// Use this when you need a unique file name to deterministically describe the "data"
/// of the benchmark. The rules for file names are different from the rules for
/// database names, so this is a different function.
pub fn get_fname(&self) -> String {
match self {
Self::Test => String::from("test"),
Self::Tpch(tpch_config) => {
format!("tpch_sf{}", tpch_config.scale_factor)
}
Self::Job(_) => String::from("job"),
}
}

/// An ID is just a unique string identifying the benchmark
/// It's not always used in the same situations as get_dbname(), so it's a separate function
pub fn get_id(&self) -> String {
// the fact that it happens to return dbname is an implementation detail
self.get_dbname()
}

pub fn is_readonly(&self) -> bool {
match self {
Self::Test => true,
Self::Tpch(_) => true,
Self::Job(_) => true,
}
}
}
10 changes: 5 additions & 5 deletions optd-perftest/src/cardtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::Path;

use crate::postgres_dbms::PostgresDBMS;
use crate::truecard::TruecardGetter;
use crate::{benchmark::Benchmark, datafusion_dbms::DatafusionDBMS, tpch::TpchConfig};
use crate::{benchmark::Benchmark, datafusion_dbms::DatafusionDBMS};

use anyhow::{self};
use async_trait::async_trait;
Expand Down Expand Up @@ -101,22 +101,22 @@ pub trait CardtestRunnerDBMSHelper {
) -> anyhow::Result<Vec<usize>>;
}

pub async fn cardtest<P: AsRef<Path>>(
/// The core logic of cardinality testing.
pub async fn cardtest_core<P: AsRef<Path>>(
workspace_dpath: P,
rebuild_cached_optd_stats: bool,
pguser: &str,
pgpassword: &str,
tpch_config: TpchConfig,
benchmark: Benchmark,
) -> anyhow::Result<HashMap<String, Vec<Cardinfo>>> {
let pg_dbms = Box::new(PostgresDBMS::build(&workspace_dpath, pguser, pgpassword)?);
let truecard_getter = pg_dbms.clone();
let df_dbms = Box::new(DatafusionDBMS::new(&workspace_dpath, rebuild_cached_optd_stats).await?);
let dbmss: Vec<Box<dyn CardtestRunnerDBMSHelper>> = vec![pg_dbms, df_dbms];

let tpch_benchmark = Benchmark::Tpch(tpch_config.clone());
let mut cardtest_runner = CardtestRunner::new(dbmss, truecard_getter).await?;
let cardinfos_alldbs = cardtest_runner
.eval_benchmark_cardinfos_alldbs(&tpch_benchmark)
.eval_benchmark_cardinfos_alldbs(&benchmark)
.await?;
Ok(cardinfos_alldbs)
}
2 changes: 1 addition & 1 deletion optd-perftest/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS {
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
self.create_tpch_tables(&tpch_kit).await?;
match benchmark {
Benchmark::Test => unimplemented!(),
Benchmark::Tpch(tpch_config) => self.eval_tpch_estcards(tpch_config).await,
Benchmark::Job(_job_config) => unimplemented!(),
}
}
}
Expand Down
18 changes: 11 additions & 7 deletions optd-perftest/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const JOB_TABLES_URL: &str = "https://homepages.cwi.nl/~boncz/job/imdb.tgz";

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct JobConfig {
pub query_ids: Vec<u32>,
pub query_ids: Vec<String>,
}

impl Display for JobConfig {
Expand Down Expand Up @@ -128,13 +128,17 @@ impl JobKit {
pub fn get_sql_fpath_ordered_iter(
&self,
job_config: &JobConfig,
) -> io::Result<impl Iterator<Item = (u32, PathBuf)>> {
) -> io::Result<impl Iterator<Item = (String, PathBuf)>> {
let queries_dpath = self.queries_dpath.clone();
let sql_fpath_ordered_iter = job_config
.query_ids
.clone()
.into_iter()
.map(move |query_id| (query_id, queries_dpath.join(format!("{}.sql", query_id))));
let sql_fpath_ordered_iter =
job_config
.query_ids
.clone()
.into_iter()
.map(move |query_id| {
let this_genned_query_fpath = queries_dpath.join(format!("{}.sql", &query_id));
(query_id, this_genned_query_fpath)
});
Ok(sql_fpath_ordered_iter)
}
}
2 changes: 1 addition & 1 deletion optd-perftest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod benchmark;
pub mod benchmark;
pub mod cardtest;
mod datafusion_dbms;
pub mod job;
Expand Down
Loading

0 comments on commit a332018

Please sign in to comment.