diff --git a/Cargo.lock b/Cargo.lock index 99e04f9a..443e083b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2753,6 +2753,7 @@ dependencies = [ "num-traits", "ordered-float 4.2.0", "pretty-xmlish", + "serde", "tracing", "tracing-subscriber", ] @@ -2806,6 +2807,7 @@ dependencies = [ "crossbeam", "itertools", "rand 0.8.5", + "serde", ] [[package]] diff --git a/optd-core/Cargo.toml b/optd-core/Cargo.toml index 36f927ca..bfa4b261 100644 --- a/optd-core/Cargo.toml +++ b/optd-core/Cargo.toml @@ -14,3 +14,4 @@ ordered-float = "4" tracing-subscriber = "0.3" pretty-xmlish = "0.1" itertools = "0.11" +serde = {version = "1.0", features = ["derive", "rc"]} \ No newline at end of file diff --git a/optd-core/src/rel_node.rs b/optd-core/src/rel_node.rs index f12a7a26..a140ebf7 100644 --- a/optd-core/src/rel_node.rs +++ b/optd-core/src/rel_node.rs @@ -9,6 +9,7 @@ use std::{ }; use ordered_float::OrderedFloat; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{cascades::GroupId, cost::Cost}; @@ -27,6 +28,30 @@ pub trait RelNodeTyp: } #[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct SerializableOrderedF64(pub OrderedFloat); + +impl Serialize for SerializableOrderedF64 { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // Directly serialize the inner f64 value of the OrderedFloat + self.0 .0.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for SerializableOrderedF64 { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // Deserialize an f64 and wrap it in an OrderedFloat + let float = f64::deserialize(deserializer)?; + Ok(SerializableOrderedF64(OrderedFloat(float))) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Value { UInt8(u8), UInt16(u16), @@ -37,7 +62,7 @@ pub enum Value { Int32(i32), Int64(i64), Int128(i128), - Float(OrderedFloat), + Float(SerializableOrderedF64), String(Arc), Bool(bool), Date32(i32), @@ -57,7 +82,7 @@ impl std::fmt::Display for Value { Self::Int32(x) => write!(f, "{x}"), Self::Int64(x) => write!(f, "{x}"), Self::Int128(x) => write!(f, "{x}"), - Self::Float(x) => write!(f, "{x}"), + Self::Float(x) => write!(f, "{}", x.0), Self::String(x) => write!(f, "\"{x}\""), Self::Bool(x) => write!(f, "{x}"), Self::Date32(x) => write!(f, "{x}"), @@ -133,7 +158,7 @@ impl Value { pub fn as_f64(&self) -> f64 { match self { - Value::Float(i) => **i, + Value::Float(i) => *i.0, _ => panic!("Value is not an f64"), } } diff --git a/optd-datafusion-repr/src/bin/test_optimize.rs b/optd-datafusion-repr/src/bin/test_optimize.rs index 7aa894f0..6eb5deb3 100644 --- a/optd-datafusion-repr/src/bin/test_optimize.rs +++ b/optd-datafusion-repr/src/bin/test_optimize.rs @@ -8,7 +8,7 @@ use optd_core::{ rules::{Rule, RuleWrapper}, }; use optd_datafusion_repr::{ - cost::{OptCostModel, PerTableStats}, + cost::{base_cost::DataFusionPerTableStats, OptCostModel}, plan_nodes::{ BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, JoinType, LogicalFilter, LogicalJoin, LogicalScan, OptRelNode, OptRelNodeTyp, PlanNode, @@ -45,7 +45,7 @@ pub fn main() { Box::new(OptCostModel::new( [("t1", 1000), ("t2", 100), ("t3", 10000)] .into_iter() - .map(|(x, y)| (x.to_string(), PerTableStats::new(y, vec![]))) + .map(|(x, y)| (x.to_string(), DataFusionPerTableStats::new(y, vec![]))) .collect(), )), vec![], diff --git a/optd-datafusion-repr/src/cost.rs b/optd-datafusion-repr/src/cost.rs index 212b3331..df4da961 100644 --- a/optd-datafusion-repr/src/cost.rs +++ b/optd-datafusion-repr/src/cost.rs @@ -1,5 +1,5 @@ -mod adaptive_cost; -mod base_cost; +pub mod adaptive_cost; +pub mod base_cost; mod stats; pub use adaptive_cost::{AdaptiveCostModel, RuntimeAdaptionStorage, DEFAULT_DECAY}; diff --git a/optd-datafusion-repr/src/cost/adaptive_cost.rs b/optd-datafusion-repr/src/cost/adaptive_cost.rs index 35625782..dd29edc6 100644 --- a/optd-datafusion-repr/src/cost/adaptive_cost.rs +++ b/optd-datafusion-repr/src/cost/adaptive_cost.rs @@ -10,9 +10,14 @@ use optd_core::{ rel_node::{RelNode, Value}, }; -use super::base_cost::BaseTableStats; +use super::base_cost::{ + BaseTableStats, DataFusionDistribution, DataFusionMostCommonValues, Distribution, + MostCommonValues, +}; pub type RuntimeAdaptionStorage = Arc>; +pub type DataFusionAdaptiveCostModel = + AdaptiveCostModel; #[derive(Default, Debug)] pub struct RuntimeAdaptionStorageInner { @@ -22,13 +27,13 @@ pub struct RuntimeAdaptionStorageInner { pub const DEFAULT_DECAY: usize = 50; -pub struct AdaptiveCostModel { +pub struct AdaptiveCostModel { runtime_row_cnt: RuntimeAdaptionStorage, - base_model: OptCostModel, + base_model: OptCostModel, decay: usize, } -impl CostModel for AdaptiveCostModel { +impl CostModel for AdaptiveCostModel { fn explain(&self, cost: &Cost) -> String { self.base_model.explain(cost) } @@ -56,11 +61,11 @@ impl CostModel for AdaptiveCostModel { { if *iter + self.decay >= guard.iter_cnt { let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64; - return OptCostModel::cost(runtime_row_cnt, 0.0, runtime_row_cnt); + return OptCostModel::::cost(runtime_row_cnt, 0.0, runtime_row_cnt); } } } - let (mut row_cnt, compute_cost, io_cost) = OptCostModel::cost_tuple( + let (mut row_cnt, compute_cost, io_cost) = OptCostModel::::cost_tuple( &self .base_model .compute_cost(node, data, children, context.clone(), optimizer), @@ -74,7 +79,7 @@ impl CostModel for AdaptiveCostModel { } } } - OptCostModel::cost(row_cnt, compute_cost, io_cost) + OptCostModel::::cost(row_cnt, compute_cost, io_cost) } fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { @@ -82,8 +87,8 @@ impl CostModel for AdaptiveCostModel { } } -impl AdaptiveCostModel { - pub fn new(decay: usize, stats: BaseTableStats) -> Self { +impl AdaptiveCostModel { + pub fn new(decay: usize, stats: BaseTableStats) -> Self { Self { runtime_row_cnt: RuntimeAdaptionStorage::default(), base_model: OptCostModel::new(stats), diff --git a/optd-datafusion-repr/src/cost/base_cost.rs b/optd-datafusion-repr/src/cost/base_cost.rs index 3f5a6de8..d956f867 100644 --- a/optd-datafusion-repr/src/cost/base_cost.rs +++ b/optd-datafusion-repr/src/cost/base_cost.rs @@ -20,6 +20,7 @@ use optd_core::{ }; use optd_gungnir::stats::hyperloglog::{self, HyperLogLog}; use optd_gungnir::stats::tdigest::{self, TDigest}; +use serde::{Deserialize, Serialize}; fn compute_plan_node_cost>( model: &C, @@ -36,13 +37,25 @@ fn compute_plan_node_cost>( cost } -pub type BaseTableStats = HashMap; - -pub struct OptCostModel { - per_table_stats_map: BaseTableStats, +pub type BaseTableStats = HashMap>; + +// The "standard" concrete types that optd currently uses +// All of optd (except unit tests) must use the same types +pub type DataFusionMostCommonValues = MockMostCommonValues; +pub type DataFusionDistribution = TDigest; +pub type DataFusionBaseTableStats = + BaseTableStats; +pub type DataFusionPerTableStats = + PerTableStats; +pub type DataFusionPerColumnStats = + PerColumnStats; + +pub struct OptCostModel { + per_table_stats_map: BaseTableStats, } -struct MockMostCommonValues { +#[derive(Serialize, Deserialize)] +pub struct MockMostCommonValues { mcvs: HashMap, } @@ -76,12 +89,13 @@ impl MostCommonValues for MockMostCommonValues { } } -pub struct PerTableStats { +#[derive(Serialize, Deserialize)] +pub struct PerTableStats { row_cnt: usize, - per_column_stats_vec: Vec>, + per_column_stats_vec: Vec>>, } -impl PerTableStats { +impl DataFusionPerTableStats { pub fn from_record_batches>>( batch_iter: RecordBatchIterator, ) -> anyhow::Result { @@ -137,12 +151,12 @@ impl PerTableStats { let mut per_column_stats_vec = Vec::with_capacity(col_cnt); for i in 0..col_cnt { per_column_stats_vec.push(if Self::is_type_supported(&col_types[i]) { - Some(PerColumnStats { - mcvs: Box::new(mcvs[i].take().unwrap()) as Box, - ndistinct: hlls[i].n_distinct(), - null_frac: null_cnt[i] as f64 / row_cnt as f64, - distr: Box::new(distr[i].take().unwrap()) as Box, - }) + Some(PerColumnStats::new( + mcvs[i].take().unwrap(), + hlls[i].n_distinct(), + null_cnt[i] as f64 / row_cnt as f64, + distr[i].take().unwrap(), + )) } else { None }); @@ -245,9 +259,10 @@ impl PerTableStats { } } -pub struct PerColumnStats { +#[derive(Serialize, Deserialize)] +pub struct PerColumnStats { // even if nulls are the most common, they cannot appear in mcvs - mcvs: Box, + mcvs: M, // ndistinct _does_ include the values in mcvs // ndistinct _does not_ include nulls @@ -259,7 +274,18 @@ pub struct PerColumnStats { // distribution _does not_ include the values in mcvs // distribution _does not_ include nulls - distr: Box, + distr: D, +} + +impl PerColumnStats { + pub fn new(mcvs: M, ndistinct: u64, null_frac: f64, distr: D) -> Self { + Self { + mcvs, + ndistinct, + null_frac, + distr, + } + } } pub trait MostCommonValues: 'static + Send + Sync { @@ -291,7 +317,7 @@ pub const IO_COST: usize = 3; // TODO: a future PR will remove this and get the code working for all of TPC-H const INVALID_SELECTIVITY: f64 = 0.001; -impl OptCostModel { +impl OptCostModel { pub fn row_cnt(Cost(cost): &Cost) -> f64 { cost[ROW_COUNT] } @@ -323,7 +349,7 @@ impl OptCostModel { } } -impl CostModel for OptCostModel { +impl CostModel for OptCostModel { fn explain(&self, cost: &Cost) -> String { format!( "weighted={},row_cnt={},compute={},io={}", @@ -471,8 +497,8 @@ impl CostModel for OptCostModel { } } -impl OptCostModel { - pub fn new(per_table_stats_map: BaseTableStats) -> Self { +impl OptCostModel { + pub fn new(per_table_stats_map: BaseTableStats) -> Self { Self { per_table_stats_map, } @@ -762,8 +788,8 @@ impl OptCostModel { } } -impl PerTableStats { - pub fn new(row_cnt: usize, per_column_stats_vec: Vec>) -> Self { +impl PerTableStats { + pub fn new(row_cnt: usize, per_column_stats_vec: Vec>>) -> Self { Self { row_cnt, per_column_stats_vec, @@ -771,22 +797,6 @@ impl PerTableStats { } } -impl PerColumnStats { - pub fn new( - mcvs: Box, - ndistinct: u64, - null_frac: f64, - distr: Box, - ) -> Self { - Self { - mcvs, - ndistinct, - null_frac, - distr, - } - } -} - /// I thought about using the system's own parser and planner to generate these expression trees, but /// this is not currently feasible because it would create a cyclic dependency between optd-datafusion-bridge /// and optd-datafusion-repr @@ -804,16 +814,17 @@ mod tests { }; use super::{Distribution, MostCommonValues, OptCostModel, PerColumnStats, PerTableStats}; + type TestPerColumnStats = PerColumnStats; - struct MockMostCommonValues { + struct TestMostCommonValues { mcvs: HashMap, } - struct MockDistribution { + struct TestDistribution { cdfs: HashMap, } - impl MockMostCommonValues { + impl TestMostCommonValues { fn new(mcvs_vec: Vec<(Value, f64)>) -> Self { Self { mcvs: mcvs_vec.into_iter().collect(), @@ -821,11 +832,11 @@ mod tests { } pub fn empty() -> Self { - MockMostCommonValues::new(vec![]) + TestMostCommonValues::new(vec![]) } } - impl MostCommonValues for MockMostCommonValues { + impl MostCommonValues for TestMostCommonValues { fn freq(&self, value: &Value) -> Option { self.mcvs.get(value).copied() } @@ -847,7 +858,7 @@ mod tests { } } - impl MockDistribution { + impl TestDistribution { fn new(cdfs_vec: Vec<(Value, f64)>) -> Self { Self { cdfs: cdfs_vec.into_iter().collect(), @@ -855,11 +866,11 @@ mod tests { } fn empty() -> Self { - MockDistribution::new(vec![]) + TestDistribution::new(vec![]) } } - impl Distribution for MockDistribution { + impl Distribution for TestDistribution { fn cdf(&self, value: &Value) -> f64 { *self.cdfs.get(value).unwrap_or(&0.0) } @@ -868,7 +879,9 @@ mod tests { const TABLE1_NAME: &str = "t1"; // one column is sufficient for all filter selectivity predicates - fn create_one_column_cost_model(per_column_stats: PerColumnStats) -> OptCostModel { + fn create_one_column_cost_model( + per_column_stats: TestPerColumnStats, + ) -> OptCostModel { OptCostModel::new( vec![( String::from(TABLE1_NAME), @@ -923,11 +936,11 @@ mod tests { #[test] fn test_colref_eq_constint_in_mcv() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![(Value::Int32(1), 0.3)])), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![(Value::Int32(1), 0.3)]), 0, 0.0, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let expr_tree = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(1))); let expr_tree_rev = bin_op(BinOpType::Eq, cnst(Value::Int32(1)), col_ref(0)); @@ -947,14 +960,11 @@ mod tests { #[test] fn test_colref_eq_constint_not_in_mcv_no_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![ - (Value::Int32(1), 0.2), - (Value::Int32(3), 0.44), - ])), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![(Value::Int32(1), 0.2), (Value::Int32(3), 0.44)]), 5, 0.0, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let expr_tree = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(2))); let expr_tree_rev = bin_op(BinOpType::Eq, cnst(Value::Int32(2)), col_ref(0)); @@ -974,14 +984,11 @@ mod tests { #[test] fn test_colref_eq_constint_not_in_mcv_with_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![ - (Value::Int32(1), 0.2), - (Value::Int32(3), 0.44), - ])), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![(Value::Int32(1), 0.2), (Value::Int32(3), 0.44)]), 5, 0.03, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let expr_tree = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(2))); let expr_tree_rev = bin_op(BinOpType::Eq, cnst(Value::Int32(2)), col_ref(0)); @@ -1002,11 +1009,11 @@ mod tests { /// I only have one test for NEQ since I'll assume that it uses the same underlying logic as EQ #[test] fn test_colref_neq_constint_in_mcv() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![(Value::Int32(1), 0.3)])), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![(Value::Int32(1), 0.3)]), 0, 0.0, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let expr_tree = bin_op(BinOpType::Neq, col_ref(0), cnst(Value::Int32(1))); let expr_tree_rev = bin_op(BinOpType::Neq, cnst(Value::Int32(1)), col_ref(0)); @@ -1026,11 +1033,11 @@ mod tests { #[test] fn test_colref_leq_constint_no_mcvs_in_range() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 10, 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Leq, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0)); @@ -1050,11 +1057,11 @@ mod tests { #[test] fn test_colref_leq_constint_no_mcvs_in_range_with_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 10, 0.1, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Leq, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0)); @@ -1074,8 +1081,8 @@ mod tests { #[test] fn test_colref_leq_constint_with_mcvs_in_range_not_at_border() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues { + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues { mcvs: vec![ (Value::Int32(6), 0.05), (Value::Int32(10), 0.1), @@ -1084,10 +1091,10 @@ mod tests { ] .into_iter() .collect(), - }), + }, 10, 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Leq, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0)); @@ -1107,16 +1114,16 @@ mod tests { #[test] fn test_colref_leq_constint_with_mcv_at_border() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![ + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![ (Value::Int32(6), 0.05), (Value::Int32(10), 0.1), (Value::Int32(15), 0.08), (Value::Int32(25), 0.07), - ])), + ]), 10, 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Leq, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0)); @@ -1136,11 +1143,11 @@ mod tests { #[test] fn test_colref_lt_constint_no_mcvs_in_range() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 10, 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Gt, cnst(Value::Int32(15)), col_ref(0)); @@ -1160,11 +1167,11 @@ mod tests { #[test] fn test_colref_lt_constint_no_mcvs_in_range_with_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 9, // 90% of the values aren't nulls since null_frac = 0.1. if there are 9 distinct non-null values, each will have 0.1 frequency 0.1, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Gt, cnst(Value::Int32(15)), col_ref(0)); @@ -1184,8 +1191,8 @@ mod tests { #[test] fn test_colref_lt_constint_with_mcvs_in_range_not_at_border() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues { + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues { mcvs: vec![ (Value::Int32(6), 0.05), (Value::Int32(10), 0.1), @@ -1194,10 +1201,10 @@ mod tests { ] .into_iter() .collect(), - }), + }, 11, // there are 4 MCVs which together add up to 0.3. With 11 total ndistinct, each remaining value has freq 0.1 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Gt, cnst(Value::Int32(15)), col_ref(0)); @@ -1217,8 +1224,8 @@ mod tests { #[test] fn test_colref_lt_constint_with_mcv_at_border() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues { + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues { mcvs: vec![ (Value::Int32(6), 0.05), (Value::Int32(10), 0.1), @@ -1227,10 +1234,10 @@ mod tests { ] .into_iter() .collect(), - }), + }, 11, // there are 4 MCVs which together add up to 0.3. With 11 total ndistinct, each remaining value has freq 0.1 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Gt, cnst(Value::Int32(15)), col_ref(0)); @@ -1252,11 +1259,11 @@ mod tests { /// The only interesting thing to test is that if there are nulls, those aren't included in GT #[test] fn test_colref_gt_constint_no_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 10, 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Gt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Lt, cnst(Value::Int32(15)), col_ref(0)); @@ -1276,11 +1283,11 @@ mod tests { #[test] fn test_colref_gt_constint_with_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 10, 0.1, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Gt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Lt, cnst(Value::Int32(15)), col_ref(0)); @@ -1302,11 +1309,11 @@ mod tests { /// As with above, I have one test without nulls and one test with nulls #[test] fn test_colref_geq_constint_no_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 10, 0.0, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Geq, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Leq, cnst(Value::Int32(15)), col_ref(0)); @@ -1326,11 +1333,11 @@ mod tests { #[test] fn test_colref_geq_constint_with_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::empty()), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::empty(), 9, // 90% of the values aren't nulls since null_frac = 0.1. if there are 9 distinct non-null values, each will have 0.1 frequency 0.1, - Box::new(MockDistribution::new(vec![(Value::Int32(15), 0.7)])), + TestDistribution::new(vec![(Value::Int32(15), 0.7)]), )); let expr_tree = bin_op(BinOpType::Geq, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Leq, cnst(Value::Int32(15)), col_ref(0)); @@ -1351,8 +1358,8 @@ mod tests { #[test] fn test_and() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues { + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues { mcvs: vec![ (Value::Int32(1), 0.3), (Value::Int32(5), 0.5), @@ -1360,10 +1367,10 @@ mod tests { ] .into_iter() .collect(), - }), + }, 0, 0.0, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let eq1 = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(1))); let eq5 = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(5))); @@ -1391,8 +1398,8 @@ mod tests { #[test] fn test_or() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues { + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues { mcvs: vec![ (Value::Int32(1), 0.3), (Value::Int32(5), 0.5), @@ -1400,10 +1407,10 @@ mod tests { ] .into_iter() .collect(), - }), + }, 0, 0.0, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let eq1 = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(1))); let eq5 = bin_op(BinOpType::Eq, col_ref(0), cnst(Value::Int32(5))); @@ -1431,11 +1438,11 @@ mod tests { #[test] fn test_not_no_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![(Value::Int32(1), 0.3)])), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![(Value::Int32(1), 0.3)]), 0, 0.0, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let expr_tree = un_op( UnOpType::Not, @@ -1453,11 +1460,11 @@ mod tests { #[test] fn test_not_with_nulls() { - let cost_model = create_one_column_cost_model(PerColumnStats::new( - Box::new(MockMostCommonValues::new(vec![(Value::Int32(1), 0.3)])), + let cost_model = create_one_column_cost_model(TestPerColumnStats::new( + TestMostCommonValues::new(vec![(Value::Int32(1), 0.3)]), 0, 0.1, - Box::new(MockDistribution::empty()), + TestDistribution::empty(), )); let expr_tree = un_op( UnOpType::Not, diff --git a/optd-datafusion-repr/src/cost/stats.rs b/optd-datafusion-repr/src/cost/stats.rs index e59277a1..86d20724 100644 --- a/optd-datafusion-repr/src/cost/stats.rs +++ b/optd-datafusion-repr/src/cost/stats.rs @@ -11,7 +11,7 @@ impl Distribution for TDigest { Value::Int32(i) => self.cdf(*i as f64), Value::Int64(i) => self.cdf(*i as f64), Value::Int128(i) => self.cdf(*i as f64), - Value::Float(i) => self.cdf(**i), + Value::Float(i) => self.cdf(*i.0), _ => panic!("Value is not a number"), } } diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index 6a5acca0..fa19b56c 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Result; -use cost::{AdaptiveCostModel, BaseTableStats, RuntimeAdaptionStorage, DEFAULT_DECAY}; +use cost::{ + adaptive_cost::DataFusionAdaptiveCostModel, base_cost::DataFusionBaseTableStats, + AdaptiveCostModel, BaseTableStats, RuntimeAdaptionStorage, DEFAULT_DECAY, +}; use optd_core::{ cascades::{CascadesOptimizer, GroupId, OptimizerProperties}, rel_node::RelNodeMetaMap, @@ -80,7 +83,7 @@ impl DatafusionOptimizer { /// Create an optimizer with partial explore (otherwise it's too slow). pub fn new_physical( catalog: Arc, - stats: BaseTableStats, + stats: DataFusionBaseTableStats, enable_adaptive: bool, ) -> Self { let rules = Self::default_rules(); @@ -125,7 +128,7 @@ impl DatafusionOptimizer { RuleWrapper::new_heuristic(Arc::new(EliminateFilterRule::new())), ); - let cost_model = AdaptiveCostModel::new(1000, BaseTableStats::default()); // very large decay + let cost_model = DataFusionAdaptiveCostModel::new(1000, BaseTableStats::default()); // very large decay let runtime_statistics = cost_model.get_runtime_map(); let optimizer = CascadesOptimizer::new( rule_wrappers, diff --git a/optd-datafusion-repr/src/plan_nodes/expr.rs b/optd-datafusion-repr/src/plan_nodes/expr.rs index f4dfdd85..4093388c 100644 --- a/optd-datafusion-repr/src/plan_nodes/expr.rs +++ b/optd-datafusion-repr/src/plan_nodes/expr.rs @@ -5,7 +5,7 @@ use itertools::Itertools; use pretty_xmlish::Pretty; use serde::{Deserialize, Serialize}; -use optd_core::rel_node::{RelNode, RelNodeMetaMap, Value}; +use optd_core::rel_node::{RelNode, RelNodeMetaMap, SerializableOrderedF64, Value}; use super::{Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp}; @@ -172,7 +172,10 @@ impl ConstantExpr { } pub fn float64(value: f64) -> Self { - Self::new_with_type(Value::Float(value.into()), ConstantType::Float64) + Self::new_with_type( + Value::Float(SerializableOrderedF64(value.into())), + ConstantType::Float64, + ) } pub fn date(value: i64) -> Self { @@ -180,7 +183,10 @@ impl ConstantExpr { } pub fn decimal(value: f64) -> Self { - Self::new_with_type(Value::Float(value.into()), ConstantType::Decimal) + Self::new_with_type( + Value::Float(SerializableOrderedF64(value.into())), + ConstantType::Decimal, + ) } /// Gets the constant value. diff --git a/optd-gungnir/Cargo.toml b/optd-gungnir/Cargo.toml index 34dfb8a6..221b705b 100644 --- a/optd-gungnir/Cargo.toml +++ b/optd-gungnir/Cargo.toml @@ -8,4 +8,5 @@ edition = "2021" [dependencies] itertools = "0.11" rand = "0.8" -crossbeam = "0.8" \ No newline at end of file +crossbeam = "0.8" +serde = {version = "1.0", features = ["derive"]} \ No newline at end of file diff --git a/optd-gungnir/src/stats/tdigest.rs b/optd-gungnir/src/stats/tdigest.rs index 7f24d08c..262efbc6 100644 --- a/optd-gungnir/src/stats/tdigest.rs +++ b/optd-gungnir/src/stats/tdigest.rs @@ -4,12 +4,13 @@ //! For more details, refer to: https://arxiv.org/pdf/1902.04023.pdf use itertools::Itertools; +use serde::{Deserialize, Serialize}; use std::f64::consts::PI; pub const DEFAULT_COMPRESSION: f64 = 200.0; /// The TDigest structure for the statistical aggregator to query quantiles. -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct TDigest { /// A sorted array of Centroids, according to their mean. centroids: Vec, @@ -20,7 +21,7 @@ pub struct TDigest { } /// A Centroid is a cluster of aggregated data points. -#[derive(PartialEq, PartialOrd, Clone)] +#[derive(PartialEq, PartialOrd, Clone, Serialize, Deserialize)] struct Centroid { /// Mean of all aggregated points in this cluster. mean: f64, diff --git a/optd-perftest/src/benchmark.rs b/optd-perftest/src/benchmark.rs index ba8fb9a2..1056b198 100644 --- a/optd-perftest/src/benchmark.rs +++ b/optd-perftest/src/benchmark.rs @@ -37,6 +37,17 @@ impl Benchmark { dbname.to_lowercase() } + /// Use this when you need a unique file name. The rules for file names are different from the + /// rules for database names, so this is a different function + pub fn get_fname(&self) -> String { + match self { + Self::Test => String::from("test"), + Self::Tpch(tpch_config) => { + format!("tpch_sf{}", tpch_config.scale_factor) + } + } + } + /// An ID is just a unique string identifying the benchmark /// It's not always used in the same situations as get_dbname(), so it's a separate function pub fn get_id(&self) -> String { diff --git a/optd-perftest/src/cardtest.rs b/optd-perftest/src/cardtest.rs index c6b97b8a..6c4148a3 100644 --- a/optd-perftest/src/cardtest.rs +++ b/optd-perftest/src/cardtest.rs @@ -103,13 +103,14 @@ pub trait CardtestRunnerDBMSHelper { pub async fn cardtest>( workspace_dpath: P, + use_cached_optd_stats: bool, pguser: &str, pgpassword: &str, tpch_config: TpchConfig, ) -> anyhow::Result>> { let pg_dbms = Box::new(PostgresDBMS::build(&workspace_dpath, pguser, pgpassword)?); let truecard_getter = pg_dbms.clone(); - let df_dbms = Box::new(DatafusionDBMS::new(&workspace_dpath).await?); + let df_dbms = Box::new(DatafusionDBMS::new(&workspace_dpath, use_cached_optd_stats).await?); let dbmss: Vec> = vec![pg_dbms, df_dbms]; let tpch_benchmark = Benchmark::Tpch(tpch_config.clone()); diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 5b3b8d11..8a6c3bbe 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -1,8 +1,7 @@ use std::{ - fs, + fs::{self, File}, path::{Path, PathBuf}, sync::Arc, - time::Instant, }; use crate::{ @@ -27,11 +26,15 @@ use datafusion::{ use datafusion_optd_cli::helper::unescape_input; use lazy_static::lazy_static; use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner}; -use optd_datafusion_repr::{cost::BaseTableStats, cost::PerTableStats, DatafusionOptimizer}; +use optd_datafusion_repr::{ + cost::{base_cost::DataFusionBaseTableStats, BaseTableStats, PerTableStats}, + DatafusionOptimizer, +}; use regex::Regex; pub struct DatafusionDBMS { workspace_dpath: PathBuf, + use_cached_stats: bool, ctx: SessionContext, } @@ -58,9 +61,13 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { } impl DatafusionDBMS { - pub async fn new>(workspace_dpath: P) -> anyhow::Result { + pub async fn new>( + workspace_dpath: P, + use_cached_stats: bool, + ) -> anyhow::Result { Ok(DatafusionDBMS { workspace_dpath: workspace_dpath.as_ref().to_path_buf(), + use_cached_stats, ctx: Self::new_session_ctx(None).await?, }) } @@ -70,12 +77,14 @@ impl DatafusionDBMS { /// /// A more ideal way to generate statistics would be to use the `ANALYZE` /// command in SQL, but DataFusion does not support that yet. - async fn clear_state(&mut self, stats: Option) -> anyhow::Result<()> { + async fn clear_state(&mut self, stats: Option) -> anyhow::Result<()> { self.ctx = Self::new_session_ctx(stats).await?; Ok(()) } - async fn new_session_ctx(stats: Option) -> anyhow::Result { + async fn new_session_ctx( + stats: Option, + ) -> anyhow::Result { let session_config = SessionConfig::from_env()?.with_information_schema(true); let rn_config = RuntimeConfig::new(); let runtime_env = RuntimeEnv::new(rn_config.clone())?; @@ -131,8 +140,6 @@ impl DatafusionDBMS { } async fn eval_tpch_estcards(&self, tpch_config: &TpchConfig) -> anyhow::Result> { - let start = Instant::now(); - let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_queries(tpch_config)?; @@ -143,9 +150,6 @@ impl DatafusionDBMS { estcards.push(estcard); } - let duration = start.elapsed(); - println!("datafusion eval_tpch_estcards duration: {:?}", duration); - Ok(estcards) } @@ -199,10 +203,29 @@ impl DatafusionDBMS { async fn get_benchmark_stats( &mut self, benchmark: &Benchmark, - ) -> anyhow::Result { - match benchmark { - Benchmark::Tpch(tpch_config) => self.get_tpch_stats(tpch_config).await, - _ => unimplemented!(), + ) -> anyhow::Result { + let benchmark_fname = benchmark.get_fname(); + let stats_cache_fpath = self + .workspace_dpath + .join("datafusion_stats_caches") + .join(format!("{}.json", benchmark_fname)); + if self.use_cached_stats && stats_cache_fpath.exists() { + let file = File::open(&stats_cache_fpath)?; + Ok(serde_json::from_reader(file)?) + } else { + let base_table_stats = match benchmark { + Benchmark::Tpch(tpch_config) => self.get_tpch_stats(tpch_config).await?, + _ => unimplemented!(), + }; + + // regardless of whether self.use_cached_stats is true or false, we want to update the cache + // this way, even if we choose not to read from the cache, the cache still always has the + // most up to date version of the stats + fs::create_dir_all(stats_cache_fpath.parent().unwrap())?; + let file = File::create(&stats_cache_fpath)?; + serde_json::to_writer(file, &base_table_stats)?; + + Ok(base_table_stats) } } @@ -221,8 +244,6 @@ impl DatafusionDBMS { #[allow(dead_code)] async fn load_tpch_data_no_stats(&mut self, tpch_config: &TpchConfig) -> anyhow::Result<()> { - let start = Instant::now(); - // Generate the tables. let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_tables(tpch_config)?; @@ -269,15 +290,13 @@ impl DatafusionDBMS { .await?; } - let duration = start.elapsed(); - println!("datafusion load_tpch_data duration: {:?}", duration); - Ok(()) } - async fn get_tpch_stats(&mut self, tpch_config: &TpchConfig) -> anyhow::Result { - let start = Instant::now(); - + async fn get_tpch_stats( + &mut self, + tpch_config: &TpchConfig, + ) -> anyhow::Result { // Generate the tables let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_tables(tpch_config)?; @@ -317,10 +336,8 @@ impl DatafusionDBMS { tbl_name.to_string(), PerTableStats::from_record_batches(batch_iter)?, ); - log::debug!("statistics generated for table: {}", tbl_name); } - let duration = start.elapsed(); - println!("datafusion load_tpch_stats duration: {:?}", duration); + Ok(base_table_stats) } } diff --git a/optd-perftest/src/main.rs b/optd-perftest/src/main.rs index cc27e050..8ceb0819 100644 --- a/optd-perftest/src/main.rs +++ b/optd-perftest/src/main.rs @@ -7,7 +7,7 @@ use std::fs; #[derive(Parser)] struct Cli { - #[arg(long)] + #[clap(long)] #[clap(default_value = "optd_perftest_workspace")] #[clap( help = "The directory where artifacts required for performance testing (such as pgdata or TPC-H queries) are generated. See comment of parse_pathstr() to see what paths are allowed (TLDR: absolute and relative both ok)." @@ -21,26 +21,36 @@ struct Cli { #[derive(Subcommand)] enum Commands { Cardtest { - #[arg(long)] + #[clap(long)] #[clap(default_value = "0.01")] scale_factor: f64, - #[arg(long)] + #[clap(long)] #[clap(default_value = "15721")] seed: i32, - #[arg(long)] + #[clap(long)] #[clap(value_delimiter = ',', num_args = 1..)] // this is the current list of all queries that work in perftest #[clap(default_value = "2,3,5,6,7,8,9,10,11,12,13,14,17")] + #[clap(help = "The queries to get the Q-error of")] query_ids: Vec, - #[arg(long)] + #[clap(long)] + #[clap(action)] + #[clap(help = "Whether to use the cached optd stats/cache generated stats")] + // this is an option that is not enabled by default so that the user doesn't + // accidentally use a stale version of the stats + // regardless of whether this is true or false, we still _write_ to the cache + // so that the cache always has the latest version of the stats + use_cached_optd_stats: bool, + + #[clap(long)] #[clap(default_value = "default_user")] #[clap(help = "The name of a user with superuser privileges")] pguser: String, - #[arg(long)] + #[clap(long)] #[clap(default_value = "password")] #[clap(help = "The name of a user with superuser privileges")] pgpassword: String, @@ -67,6 +77,7 @@ async fn main() -> anyhow::Result<()> { scale_factor, seed, query_ids, + use_cached_optd_stats, pguser, pgpassword, } => { @@ -76,8 +87,14 @@ async fn main() -> anyhow::Result<()> { seed, query_ids: query_ids.clone(), }; - let cardinfo_alldbs = - cardtest::cardtest(&workspace_dpath, &pguser, &pgpassword, tpch_config).await?; + let cardinfo_alldbs = cardtest::cardtest( + &workspace_dpath, + use_cached_optd_stats, + &pguser, + &pgpassword, + tpch_config, + ) + .await?; println!(); println!(" Aggregate Q-Error Comparison"); let mut agg_qerror_table = Table::new(); diff --git a/optd-perftest/src/postgres_dbms.rs b/optd-perftest/src/postgres_dbms.rs index 61870f64..1f9e867c 100644 --- a/optd-perftest/src/postgres_dbms.rs +++ b/optd-perftest/src/postgres_dbms.rs @@ -13,7 +13,6 @@ use std::{ fs, io::Cursor, path::{Path, PathBuf}, - time::Instant, }; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -139,8 +138,6 @@ impl PostgresDBMS { client: &Client, tpch_config: &TpchConfig, ) -> anyhow::Result<()> { - let start = Instant::now(); - // set up TpchKit let tpch_kit = TpchKit::build(&self.workspace_dpath)?; @@ -158,8 +155,8 @@ impl PostgresDBMS { // load the constraints and indexes // TODO: constraints are currently broken - // let sql = fs::read_to_string(tpch_kit.constraints_fpath.to_str().unwrap())?; - // client.batch_execute(&sql).await?; + let sql = fs::read_to_string(tpch_kit.constraints_fpath.to_str().unwrap())?; + client.batch_execute(&sql).await?; let sql = fs::read_to_string(tpch_kit.indexes_fpath.to_str().unwrap())?; client.batch_execute(&sql).await?; @@ -168,9 +165,6 @@ impl PostgresDBMS { // this is standard practice for postgres benchmarking client.query("VACUUM FULL ANALYZE", &[]).await?; - let duration = start.elapsed(); - println!("postgres load_tpch_data duration: {:?}", duration); - Ok(()) } @@ -207,8 +201,6 @@ impl PostgresDBMS { client: &Client, tpch_config: &TpchConfig, ) -> anyhow::Result> { - let start = Instant::now(); - let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_queries(tpch_config)?; @@ -219,9 +211,6 @@ impl PostgresDBMS { estcards.push(estcard); } - let duration = start.elapsed(); - println!("postgres eval_tpch_estcards duration: {:?}", duration); - Ok(estcards) } @@ -247,8 +236,6 @@ impl PostgresDBMS { dbname: &str, // used by truecard_cache truecard_cache: &mut TruecardCache, ) -> anyhow::Result> { - let start = Instant::now(); - let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_queries(tpch_config)?; @@ -266,9 +253,6 @@ impl PostgresDBMS { truecards.push(truecard); } - let duration = start.elapsed(); - println!("postgres eval_tpch_truecards duration: {:?}", duration); - Ok(truecards) } diff --git a/optd-perftest/tests/cardtest_integration.rs b/optd-perftest/tests/cardtest_integration.rs index c1afc9ae..06db7bd4 100644 --- a/optd-perftest/tests/cardtest_integration.rs +++ b/optd-perftest/tests/cardtest_integration.rs @@ -44,6 +44,7 @@ mod tests { // make sure scale factor is low so the test runs fast "--scale-factor", "0.01", + "--use-cached-optd-stats", "--pguser", "test_user", "--pgpassword",