diff --git a/optd-core/src/cascades.rs b/optd-core/src/cascades.rs index 1d415fd1..006206cb 100644 --- a/optd-core/src/cascades.rs +++ b/optd-core/src/cascades.rs @@ -5,5 +5,5 @@ mod optimizer; mod tasks; use memo::Memo; -pub use optimizer::{CascadesOptimizer, GroupId, OptimizerProperties, RelNodeContext}; +pub use optimizer::{CascadesOptimizer, GroupId, SubGroupId, OptimizerProperties, RelNodeContext}; use tasks::Task; diff --git a/optd-core/src/cascades/memo.rs b/optd-core/src/cascades/memo.rs index 1fdf344a..85e71a9c 100644 --- a/optd-core/src/cascades/memo.rs +++ b/optd-core/src/cascades/memo.rs @@ -1,7 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - fmt::Display, - sync::Arc, + collections::{hash_map::Entry, HashMap, HashSet}, fmt::Display, sync::Arc }; use anyhow::{bail, Result}; @@ -10,11 +8,12 @@ use std::any::Any; use crate::{ cost::Cost, + physical_prop::PhysicalPropsBuilder, property::PropertyBuilderAny, rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp, Value}, }; -use super::optimizer::{ExprId, GroupId}; +use super::optimizer::{ExprId, GroupId, SubGroupId}; pub type RelMemoNodeRef = Arc>; @@ -22,7 +21,7 @@ pub type RelMemoNodeRef = Arc>; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct RelMemoNode { pub typ: T, - pub children: Vec, + pub children: Vec<(GroupId,SubGroupId)>, pub data: Option, } @@ -33,7 +32,7 @@ impl std::fmt::Display for RelMemoNode { write!(f, " {}", data)?; } for child in &self.children { - write!(f, " {}", child)?; + write!(f, " {}, {}", child.0, child.1)?; } write!(f, ")") } @@ -47,16 +46,72 @@ pub struct Winner { } #[derive(Default, Debug, Clone)] -pub struct GroupInfo { +pub struct SubGroupInfo { pub winner: Option, } -pub(crate) struct Group { - pub(crate) group_exprs: HashSet, - pub(crate) info: GroupInfo, +pub(crate) struct SubGroup { + pub(crate) sub_group_info: SubGroupInfo, + pub(crate) sub_group_exprs: HashSet, +} + +pub(crate) struct Group> { + /// sub_groups are for special requiredPhysicalProps + /// all exprs within one sub group are logically equivalent and provide same physical props + /// one expr can be in multiple sub groups + /// the first sub_group in the group is the default sub group containing all exprs in the group + /// eg: a group of join exprs, there are two subgroups, 0 is for all exprs, others are for special requirement + /// sub_groups 0: + /// sub_groups 1: + pub(crate) sub_groups: Vec, + pub(crate) physical_props_builder: Arc

, + pub(crate) physical_props: Vec, + pub(crate) sub_group_physical_prop_map: HashMap, pub(crate) properties: Arc<[Box]>, } +impl> Group{ + pub fn new(physical_props_builder: Arc

) -> Self { + let mut group = Group:: { + sub_groups: Vec::new(), + physical_props_builder: physical_props_builder.clone(), + physical_props: Vec::new(), + sub_group_physical_prop_map: HashMap::new(), + properties: Vec::new().into(), + }; + let default_sub_group = SubGroup { + sub_group_info: SubGroupInfo{ + winner: None, + }, + sub_group_exprs: HashSet::new(), + }; + group.sub_groups.push(default_sub_group); + group.physical_props.push(physical_props_builder.any()); + group.sub_group_physical_prop_map.insert(physical_props_builder.any(), SubGroupId(0)); + group + } + + pub fn insert_expr_to_default_sub_group(&mut self, expr_id: ExprId){ + self.sub_groups[0].sub_group_exprs.insert(expr_id); + } + + pub fn sub_group_exprs(&self, sub_group_id: SubGroupId) -> &HashSet { + &self.sub_groups[sub_group_id.0].sub_group_exprs + } + + pub fn group_exprs(&self) -> &HashSet { + &(self.sub_groups[0].sub_group_exprs) + } + + pub fn group_exprs_mut(&mut self) -> &mut HashSet { + &mut self.sub_groups[0].sub_group_exprs + } + + pub fn default_sub_group(&self) -> &SubGroup { + &self.sub_groups[0] + } +} + #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] struct ReducedGroupId(usize); @@ -72,18 +127,20 @@ impl Display for ReducedGroupId { } } -pub struct Memo { +pub struct Memo> { expr_id_to_group_id: HashMap, expr_id_to_expr_node: HashMap>, expr_node_to_expr_id: HashMap, ExprId>, - groups: HashMap, + groups: HashMap>, group_expr_counter: usize, merged_groups: HashMap, + required_root_props: P::PhysicalProps, + physical_property_builders: Arc

, property_builders: Arc<[Box>]>, } -impl Memo { - pub fn new(property_builders: Arc<[Box>]>) -> Self { +impl> Memo { + pub fn new(property_builders: Arc<[Box>]>, required_root_props: P::PhysicalProps, physical_property_builders: Arc

) -> Self { Self { expr_id_to_group_id: HashMap::new(), expr_id_to_expr_node: HashMap::new(), @@ -91,6 +148,8 @@ impl Memo { groups: HashMap::new(), group_expr_counter: 0, merged_groups: HashMap::new(), + required_root_props, + physical_property_builders, property_builders, } } @@ -122,7 +181,7 @@ impl Memo { let group_a_exprs = self.get_all_exprs_in_group(group_a.as_group_id()); for expr_id in group_a_exprs { let expr_node = self.expr_id_to_expr_node.get(&expr_id).unwrap(); - self.add_expr_to_group(expr_id, group_b, expr_node.as_ref().clone()); + self.add_expr_to_group_default_sub_group(expr_id, group_b, expr_node.as_ref().clone()); } self.merged_groups @@ -154,7 +213,7 @@ impl Memo { /// Add or get an expression into the memo, returns the group id and the expr id. If `GroupId` is `None`, /// create a new group. Otherwise, add the expression to the group. - pub fn add_new_group_expr( + pub fn add_new_group_expr_to_default_sub_group( &mut self, rel_node: RelNodeRef, add_to_group_id: Option, @@ -164,7 +223,7 @@ impl Memo { self.merge_group(grp_a, grp_b); }; - let (group_id, expr_id) = self.add_new_group_expr_inner( + let (group_id, expr_id) = self.add_new_group_expr_to_default_sub_group_inner( rel_node, add_to_group_id.map(|x| self.get_reduced_group_id(x)), ); @@ -176,10 +235,11 @@ impl Memo { .children .iter() .map(|child| { - if let Some(group) = child.typ.extract_group() { + if let Some(group) = child.typ.extract_group_and_sub_group() { group } else { - self.get_expr_info(child.clone()).0 + // TODO(avery): not sure when to use get expr info and if this is ok to return SubGroup(0) + (self.get_expr_info(child.clone()).0, SubGroupId(0)) } }) .collect::>(); @@ -203,7 +263,7 @@ impl Memo { .children .iter() .map(|child| { - let group_id = self.get_reduced_group_id(*child); + let group_id = self.get_reduced_group_id(child.0); self.groups[&group_id].properties.clone() }) .collect_vec(); @@ -229,7 +289,7 @@ impl Memo { /// If group_id exists, it adds expr_id to the existing group /// Otherwise, it creates a new group of that group_id and insert expr_id into the new group - fn add_expr_to_group( + fn add_expr_to_group_default_sub_group( &mut self, expr_id: ExprId, group_id: ReducedGroupId, @@ -237,15 +297,12 @@ impl Memo { ) { if let Entry::Occupied(mut entry) = self.groups.entry(group_id) { let group = entry.get_mut(); - group.group_exprs.insert(expr_id); + group.insert_expr_to_default_sub_group(expr_id); return; } - let mut group = Group { - group_exprs: HashSet::new(), - info: GroupInfo::default(), - properties: self.infer_properties(memo_node).into(), - }; - group.group_exprs.insert(expr_id); + let mut group = Group::::new(self.physical_property_builders.clone()); + group.properties = self.infer_properties(memo_node).into(); + group.insert_expr_to_default_sub_group(expr_id); self.groups.insert(group_id, group); } @@ -262,7 +319,7 @@ impl Memo { if let Entry::Occupied(mut entry) = self.groups.entry(replace_group_id) { let group = entry.get_mut(); - if !group.group_exprs.contains(&expr_id) { + if !group.group_exprs().contains(&expr_id) { unreachable!("expr not found in group in replace_group_expr"); } @@ -270,10 +327,10 @@ impl Memo { .children .iter() .map(|child| { - if let Some(group) = child.typ.extract_group() { + if let Some(group) = child.typ.extract_group_and_sub_group() { group } else { - self.add_new_group_expr(child.clone(), None).0 + (self.add_new_group_expr_to_default_sub_group(child.clone(), None).0, SubGroupId(0)) } }) .collect::>(); @@ -310,7 +367,7 @@ impl Memo { unreachable!("group not found in replace_group_expr"); } - fn add_new_group_expr_inner( + fn add_new_group_expr_to_default_sub_group_inner( &mut self, rel_node: RelNodeRef, add_to_group_id: Option, @@ -319,10 +376,10 @@ impl Memo { .children .iter() .map(|child| { - if let Some(group) = child.typ.extract_group() { + if let Some(group) = child.typ.extract_group_and_sub_group() { group } else { - self.add_new_group_expr(child.clone(), None).0 + (self.add_new_group_expr_to_default_sub_group(child.clone(), None).0, SubGroupId(0)) } }) .collect::>(); @@ -350,7 +407,7 @@ impl Memo { self.expr_id_to_group_id .insert(expr_id, group_id.as_group_id()); self.expr_node_to_expr_id.insert(memo_node.clone(), expr_id); - self.add_expr_to_group(expr_id, group_id, memo_node); + self.add_expr_to_group_default_sub_group(expr_id, group_id, memo_node); (group_id, expr_id) } @@ -377,6 +434,7 @@ impl Memo { pub fn get_all_group_bindings( &self, group_id: GroupId, + sub_group_id: SubGroupId, physical_only: bool, exclude_placeholder: bool, level: Option, @@ -384,7 +442,28 @@ impl Memo { let group_id = self.get_reduced_group_id(group_id); let group = self.groups.get(&group_id).expect("group not found"); group - .group_exprs + .sub_group_exprs(sub_group_id) + .iter() + .filter(|x| !physical_only || !self.get_expr_memoed(**x).typ.is_logical()) + .map(|&expr_id| { + self.get_all_expr_bindings(expr_id, physical_only, exclude_placeholder, level) + }) + .concat() + } + + pub fn get_all_sub_group_bindings( + &self, + group_id: GroupId, + sub_group_id: SubGroupId, + physical_only: bool, + exclude_placeholder: bool, + level: Option, + ) -> Vec> { + let group_id = self.get_reduced_group_id(group_id); + let group = self.groups.get(&group_id).expect("group not found"); + group + .sub_groups[sub_group_id.0] + .sub_group_exprs .iter() .filter(|x| !physical_only || !self.get_expr_memoed(**x).typ.is_logical()) .map(|&expr_id| { @@ -413,7 +492,7 @@ impl Memo { children: expr .children .iter() - .map(|x| Arc::new(RelNode::new_group(*x))) + .map(|x| Arc::new(RelNode::new_group(x.0, x.1))) .collect_vec(), data: expr.data.clone(), }); @@ -424,8 +503,9 @@ impl Memo { let mut children = vec![]; let mut cumulative = 1; for child in &expr.children { - let group_exprs = self.get_all_group_bindings( - *child, + let group_exprs = self.get_all_sub_group_bindings( + child.0, + child.1, physical_only, exclude_placeholder, level.map(|x| x - 1), @@ -456,7 +536,7 @@ impl Memo { pub fn get_all_exprs_in_group(&self, group_id: GroupId) -> Vec { let group_id = self.get_reduced_group_id(group_id); let group = self.groups.get(&group_id).expect("group not found"); - let mut exprs = group.group_exprs.iter().copied().collect_vec(); + let mut exprs = group.group_exprs().iter().copied().collect_vec(); exprs.sort(); exprs } @@ -472,23 +552,115 @@ impl Memo { ids } - pub fn get_group_info(&self, group_id: GroupId) -> GroupInfo { + pub fn get_sub_group_info_by_props(&self, group_id: GroupId, + required_physical_props: &P::PhysicalProps) -> Option { + let group = self.groups.get(&self.get_reduced_group_id(group_id)).unwrap(); + let sub_group_id = group.sub_group_physical_prop_map.get(required_physical_props); + if let Some(sub_group_id) = sub_group_id { + return Some(group.sub_groups[sub_group_id.0].sub_group_info.clone()); + } + None + } + + pub fn get_sub_group_info_by_id(&self, group_id: GroupId, sub_group_id: SubGroupId) -> SubGroupInfo { + let group = self.groups.get(&self.get_reduced_group_id(group_id)).unwrap(); + assert!(sub_group_id.0>=0 && sub_group_id.0 Option { + let group = self.groups.get(&self.get_reduced_group_id(group_id)).unwrap(); + group.sub_group_physical_prop_map.get(required_physical_props).copied() + } + + pub fn update_expr_children_sub_group_id(&mut self, expr_id: ExprId, children_props: Vec) -> ExprId{ + let memo_node = self.get_expr_memoed(expr_id); + let children = memo_node.children.iter().zip(children_props.iter()).map(|(child, prop)| { + let group_id = child.0; + let group = self.groups.get(&self.get_reduced_group_id(group_id)).unwrap(); + let sub_group_id = group.sub_group_physical_prop_map.get(prop).unwrap(); + (group_id, *sub_group_id) + }).collect(); + + if children == memo_node.children { + // if there's no required props for children node, sub_group_id remain unchanged (as 0) + return expr_id; + } + + let memo_node = RelMemoNode { + typ: memo_node.typ.clone(), + children, + data: memo_node.data.clone(), + }; + assert!(!self.expr_node_to_expr_id.contains_key(&memo_node)); + + let new_expr_id = self.next_expr_id(); + let group_id = self.get_group_id_of_expr_id(expr_id); + + self.expr_id_to_expr_node.insert(new_expr_id, memo_node.clone().into()); + self.expr_id_to_group_id.insert(new_expr_id, group_id); + self.expr_node_to_expr_id.insert(memo_node.clone(), new_expr_id); + let group = self.groups.get_mut(&self.get_reduced_group_id(group_id)).unwrap(); + group.insert_expr_to_default_sub_group(new_expr_id); + new_expr_id + } + + pub fn get_group_info(&self, group_id: GroupId) -> SubGroupInfo { self.groups .get(&self.get_reduced_group_id(group_id)) .as_ref() .unwrap() - .info + .default_sub_group() + .sub_group_info .clone() } - pub(crate) fn get_group(&self, group_id: GroupId) -> &Group { + pub(crate) fn get_group(&self, group_id: GroupId) -> &Group { self.groups .get(&self.get_reduced_group_id(group_id)) .as_ref() .unwrap() } - pub fn update_group_info(&mut self, group_id: GroupId, group_info: GroupInfo) { + pub fn update_sub_group_info( + &mut self, + group_id: GroupId, + expr_id: Option, + sub_group_info: SubGroupInfo, + physical_props: &P::PhysicalProps, + ) { + if let Some(ref winner) = sub_group_info.winner { + if !winner.impossible { + assert!( + winner.cost.0[0] != 0.0, + "{}", + self.get_expr_memoed(winner.expr_id) + ); + } + } + let group = self.groups.get_mut(&self.get_reduced_group_id(group_id)).unwrap(); + if let Some(sub_group_id) = group.sub_group_physical_prop_map.get(physical_props) { + group.sub_groups[sub_group_id.0].sub_group_info = sub_group_info; + if let Some(expr_id) = expr_id { + group.sub_groups[sub_group_id.0].sub_group_exprs.insert(expr_id); + } + return; + } + let sub_group_id = group.sub_groups.len(); + let mut exprs = HashSet::new(); + if let Some(expr_id) = expr_id { + exprs.insert(expr_id); + } + group.sub_groups.push(SubGroup { + sub_group_info, + sub_group_exprs: exprs, + }); + group.physical_props.push(physical_props.clone()); + group.sub_group_physical_prop_map + .insert(physical_props.clone(), SubGroupId(sub_group_id)); + } + + pub fn update_group_info(&mut self, group_id: GroupId, group_info: SubGroupInfo) { if let Some(ref winner) = group_info.winner { if !winner.impossible { assert!( @@ -499,22 +671,96 @@ impl Memo { } } let grp = self.groups.get_mut(&self.get_reduced_group_id(group_id)); - grp.unwrap().info = group_info; + grp.unwrap().sub_groups[0].sub_group_info = group_info; + } + + pub fn add_sub_group_expr(&mut self, new_expr: RelMemoNode, group_id: GroupId, physical_props: &P::PhysicalProps) -> ExprId{ + // 1. add the new expr to the memo table + if let Some(&expr_id) = self.expr_node_to_expr_id.get(&new_expr) { + return expr_id; + } + let expr_id = self.next_expr_id(); + let reduced_group_id = self.get_reduced_group_id(group_id); + + self.expr_id_to_expr_node + .insert(expr_id, new_expr.clone().into()); + self.expr_id_to_group_id + .insert(expr_id, reduced_group_id.as_group_id()); + self.expr_node_to_expr_id.insert(new_expr.clone(), expr_id); + + // 2. insert the expr id to sub group + // as add_sub_group_expr are called for counterpart exprs, there's + // no need to add the counterpart expr to the default sub group + let group = self.groups.get_mut(&self.get_reduced_group_id(group_id)).unwrap(); + if group.sub_group_physical_prop_map.contains_key(&physical_props) { + let sub_group_id = group.sub_group_physical_prop_map.get(&physical_props).unwrap(); + group.sub_groups[sub_group_id.0].sub_group_exprs.insert(expr_id); + return expr_id; + } + + let sub_group_id = group.sub_groups.len(); + let mut exprs = HashSet::new(); + exprs.insert(expr_id); + let sub_group_info = SubGroupInfo { winner: None }; + group.sub_groups.push(SubGroup { + sub_group_info, + sub_group_exprs: exprs, + }); + group.physical_props.push(physical_props.clone()); + group.sub_group_physical_prop_map + .insert(physical_props.clone(), SubGroupId(sub_group_id)); + return expr_id; } pub fn get_best_group_binding( &self, group_id: GroupId, + physical_props: P::PhysicalProps, meta: &mut Option, ) -> Result> { - let info = self.get_group_info(group_id); + let info = self.get_sub_group_info_by_props(group_id, &physical_props); + if let Some(info) = info{ + if let Some(winner) = info.winner { + if !winner.impossible { + let expr_id = winner.expr_id; + let expr = self.get_expr_memoed(expr_id); + let mut children = Vec::with_capacity(expr.children.len()); + for child in &expr.children { + children.push(self.get_best_sub_group_binding(child.0, child.1, meta)?); + } + let node = Arc::new(RelNode { + typ: expr.typ.clone(), + children, + data: expr.data.clone(), + }); + + if let Some(meta) = meta { + meta.insert( + node.as_ref() as *const _ as usize, + RelNodeMeta::new(group_id, winner.cost), + ); + } + return Ok(node); + } + } + } + bail!("no best group binding for group {} with required physical props {:?}", group_id, physical_props) + } + + pub fn get_best_sub_group_binding( + &self, + group_id: GroupId, + sub_group_id: SubGroupId, + meta: &mut Option, + ) -> Result> { + let info = self.get_sub_group_info_by_id(group_id, sub_group_id); if let Some(winner) = info.winner { if !winner.impossible { let expr_id = winner.expr_id; let expr = self.get_expr_memoed(expr_id); let mut children = Vec::with_capacity(expr.children.len()); for child in &expr.children { - children.push(self.get_best_group_binding(*child, meta)?); + children.push(self.get_best_sub_group_binding(child.0, child.1, meta)?); } let node = Arc::new(RelNode { typ: expr.typ.clone(), @@ -531,12 +777,14 @@ impl Memo { return Ok(node); } } - bail!("no best group binding for group {}", group_id) + bail!("no best group binding for group {} subgroup {}", group_id, sub_group_id) } pub fn clear_winner(&mut self) { for group in self.groups.values_mut() { - group.info.winner = None; + group.sub_groups.iter_mut().for_each(|sub_group| { + sub_group.sub_group_info.winner = None; + }); } } diff --git a/optd-core/src/cascades/optimizer.rs b/optd-core/src/cascades/optimizer.rs index d24eec70..7bbabbbd 100644 --- a/optd-core/src/cascades/optimizer.rs +++ b/optd-core/src/cascades/optimizer.rs @@ -7,15 +7,13 @@ use std::{ use anyhow::Result; use crate::{ - cost::CostModel, - optimizer::Optimizer, - property::{PropertyBuilder, PropertyBuilderAny}, - rel_node::{RelNodeMetaMap, RelNodeRef, RelNodeTyp}, - rules::RuleWrapper, + cost::CostModel, optimizer::Optimizer, + physical_prop::PhysicalPropsBuilder, + property::{PropertyBuilder, PropertyBuilderAny}, rel_node::{RelNodeMetaMap, RelNodeRef, RelNodeTyp}, rules::RuleWrapper }; use super::{ - memo::{GroupInfo, RelMemoNodeRef}, + memo::{SubGroupInfo, RelMemoNodeRef, RelMemoNode}, tasks::OptimizeGroupTask, Memo, Task, }; @@ -37,15 +35,17 @@ pub struct OptimizerProperties { pub partial_explore_space: Option, } -pub struct CascadesOptimizer { - memo: Memo, - pub(super) tasks: VecDeque>>, +pub struct CascadesOptimizer> { + memo: Memo, + pub(super) tasks: VecDeque>>, explored_group: HashSet, fired_rules: HashMap>, rules: Arc<[Arc>]>, disabled_rules: HashSet, - cost: Arc>, + cost: Arc>, property_builders: Arc<[Box>]>, + required_root_props: P::PhysicalProps, + physical_property_builders: Arc

, pub ctx: OptimizerContext, pub prop: OptimizerProperties, } @@ -56,12 +56,15 @@ pub struct CascadesOptimizer { pub struct RelNodeContext { pub group_id: GroupId, pub expr_id: ExprId, - pub children_group_ids: Vec, + pub children_group_ids: Vec<(GroupId, SubGroupId)>, } #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] pub struct GroupId(pub(super) usize); +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] +pub struct SubGroupId(pub usize); + #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] pub struct ExprId(pub usize); @@ -71,30 +74,40 @@ impl Display for GroupId { } } +impl Display for SubGroupId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "!{}", self.0) + } +} + impl Display for ExprId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } -impl CascadesOptimizer { +impl> CascadesOptimizer { pub fn new( rules: Vec>>, - cost: Box>, + cost: Box>, property_builders: Vec>>, + physical_property_builders: Arc

, + required_root_props: P::PhysicalProps, ) -> Self { - Self::new_with_prop(rules, cost, property_builders, Default::default()) + Self::new_with_prop(rules, cost, property_builders, physical_property_builders, required_root_props, Default::default()) } pub fn new_with_prop( rules: Vec>>, - cost: Box>, + cost: Box>, property_builders: Vec>>, + physical_property_builders: Arc

, + required_root_props: P::PhysicalProps, prop: OptimizerProperties, ) -> Self { let tasks = VecDeque::new(); let property_builders: Arc<[_]> = property_builders.into(); - let memo = Memo::new(property_builders.clone()); + let memo = Memo::new(property_builders.clone(), required_root_props.clone(), physical_property_builders.clone()); Self { memo, tasks, @@ -104,12 +117,14 @@ impl CascadesOptimizer { cost: cost.into(), ctx: OptimizerContext::default(), property_builders, + physical_property_builders, + required_root_props, prop, disabled_rules: HashSet::new(), } } - pub fn cost(&self) -> Arc> { + pub fn cost(&self) -> Arc> { self.cost.clone() } @@ -129,30 +144,37 @@ impl CascadesOptimizer { self.disabled_rules.contains(&rule_id) } - pub fn dump(&self, group_id: Option) { + pub fn dump(&self, group_id: Option, sub_group_id: Option) { if let Some(group_id) = group_id { - fn dump_inner(this: &CascadesOptimizer, group_id: GroupId) { - if let Some(ref winner) = this.memo.get_group_info(group_id).winner { + fn dump_inner>(this: &CascadesOptimizer, group_id: GroupId, sub_group_id: Option) { + let mut real_sub_group_id = SubGroupId(0); + if sub_group_id.is_some(){ + real_sub_group_id = sub_group_id.unwrap(); + } + if let Some(ref winner) = this.memo.get_sub_group_info_by_id(group_id, real_sub_group_id).winner { let expr = this.memo.get_expr_memoed(winner.expr_id); assert!(!winner.impossible); if winner.cost.0[1] == 1.0 { return; } println!( - "group_id={} winner={} cost={} {}", + "group_id={} sub_group_id={} winner={} cost={} {}", group_id, + real_sub_group_id, winner.expr_id, this.cost.explain(&winner.cost), expr ); for child in &expr.children { - dump_inner(this, *child); + dump_inner(this, child.0, Some(child.1)); } } } - dump_inner(self, group_id); + dump_inner(self, group_id, sub_group_id); return; } + //TODO(avery): current implementation is that only dump default subgroup when no + // group id is passed. We might add dump for all subgroups in the future. for group_id in self.memo.get_all_group_ids() { let winner = if let Some(ref winner) = self.memo.get_group_info(group_id).winner { if winner.impossible { @@ -192,7 +214,7 @@ impl CascadesOptimizer { /// Clear the memo table and all optimizer states. pub fn step_clear(&mut self) { - self.memo = Memo::new(self.property_builders.clone()); + self.memo = Memo::new(self.property_builders.clone(), self.required_root_props.clone(), self.physical_property_builders.clone()); self.fired_rules.clear(); self.explored_group.clear(); } @@ -204,8 +226,8 @@ impl CascadesOptimizer { /// Optimize a `RelNode`. pub fn step_optimize_rel(&mut self, root_rel: RelNodeRef) -> Result { - let (group_id, _) = self.add_group_expr(root_rel, None); - self.fire_optimize_tasks(group_id)?; + let (group_id, _) = self.add_group_expr_to_default_sub_group(root_rel, None); + self.fire_optimize_tasks(group_id, self.physical_property_builders.clone(), self.required_root_props.clone())?; Ok(group_id) } @@ -213,14 +235,15 @@ impl CascadesOptimizer { pub fn step_get_optimize_rel( &self, group_id: GroupId, + physical_props: P::PhysicalProps, meta: &mut Option, ) -> Result> { - self.memo.get_best_group_binding(group_id, meta) + self.memo.get_best_group_binding(group_id, physical_props, meta) } - fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> { + fn fire_optimize_tasks(&mut self, group_id: GroupId, physical_property_builders: Arc

, required_root_props: P::PhysicalProps) -> Result<()> { self.tasks - .push_back(Box::new(OptimizeGroupTask::new(group_id))); + .push_back(Box::new(OptimizeGroupTask::new(group_id, self.physical_property_builders.clone(), self.required_root_props.clone()))); // get the task from the stack self.ctx.budget_used = false; let plan_space_begin = self.memo.compute_plan_space(); @@ -254,9 +277,9 @@ impl CascadesOptimizer { } fn optimize_inner(&mut self, root_rel: RelNodeRef) -> Result> { - let (group_id, _) = self.add_group_expr(root_rel, None); - self.fire_optimize_tasks(group_id)?; - self.memo.get_best_group_binding(group_id, &mut None) + let (group_id, _) = self.add_group_expr_to_default_sub_group(root_rel, None); + self.fire_optimize_tasks(group_id, self.physical_property_builders.clone(), self.required_root_props.clone())?; + self.memo.get_best_group_binding(group_id, self.required_root_props.clone(), &mut None) } pub fn resolve_group_id(&self, root_rel: RelNodeRef) -> GroupId { @@ -275,12 +298,12 @@ impl CascadesOptimizer { self.memo.get_expr_info(expr) } - pub(super) fn add_group_expr( + pub(super) fn add_group_expr_to_default_sub_group( &mut self, expr: RelNodeRef, group_id: Option, ) -> (GroupId, ExprId) { - self.memo.add_new_group_expr(expr, group_id) + self.memo.add_new_group_expr_to_default_sub_group(expr, group_id) } pub(super) fn replace_group_expr( @@ -305,11 +328,11 @@ impl CascadesOptimizer { }); } - pub(super) fn get_group_info(&self, group_id: GroupId) -> GroupInfo { + pub(super) fn get_group_info(&self, group_id: GroupId) -> SubGroupInfo { self.memo.get_group_info(group_id) } - pub(super) fn update_group_info(&mut self, group_id: GroupId, group_info: GroupInfo) { + pub(super) fn update_group_info(&mut self, group_id: GroupId, group_info: SubGroupInfo) { self.memo.update_group_info(group_id, group_info) } @@ -317,17 +340,68 @@ impl CascadesOptimizer { self.memo.merge_group(group_a, group_b); } + pub(super) fn get_sub_group_info_by_props( + &self, + group_id: GroupId, + physical_props: &P::PhysicalProps, + ) -> Option { + self.memo.get_sub_group_info_by_props(group_id, physical_props) + } + + pub(super) fn get_sub_group_info_by_id( + &self, + group_id: GroupId, + sub_group_id: SubGroupId, + ) -> SubGroupInfo { + self.memo.get_sub_group_info_by_id(group_id, sub_group_id) + } + + pub(super) fn get_sub_group_id( + &self, + group_id: GroupId, + physical_props: &P::PhysicalProps, + ) -> Option { + self.memo.get_sub_group_id(group_id, physical_props) + } + + pub(super) fn add_sub_group_expr( + &mut self, + expr: RelMemoNode, + group_id: GroupId, + physical_props: &P::PhysicalProps, + ) -> ExprId { + self.memo.add_sub_group_expr(expr, group_id, physical_props) + } + + pub(super) fn update_sub_group_info( + &mut self, + group_id: GroupId, + expr_id: Option, + sub_group_info: SubGroupInfo, + physical_props: &P::PhysicalProps, + ) { + self.memo.update_sub_group_info(group_id, expr_id, sub_group_info, physical_props) + } + + pub(super) fn update_expr_children_sub_group_id( + &mut self, + expr_id: ExprId, + required_props: Vec, + ) -> ExprId { + self.memo.update_expr_children_sub_group_id(expr_id, required_props) + } + /// Get the properties of a Cascades group /// P is the type of the property you expect /// idx is the idx of the property you want. The order of properties is defined /// by the property_builders parameter in CascadesOptimizer::new() - pub fn get_property_by_group>( + pub fn get_property_by_group>( &self, group_id: GroupId, idx: usize, - ) -> P::Prop { + ) -> PB::Prop { self.memo.get_group(group_id).properties[idx] - .downcast_ref::() + .downcast_ref::() .unwrap() .clone() } @@ -352,10 +426,11 @@ impl CascadesOptimizer { pub fn get_all_group_bindings( &self, group_id: GroupId, + sub_group_id: SubGroupId, physical_only: bool, ) -> Vec> { self.memo - .get_all_group_bindings(group_id, physical_only, true, Some(10)) + .get_all_group_bindings(group_id, sub_group_id, physical_only, true, Some(10)) } pub(super) fn is_group_explored(&self, group_id: GroupId) -> bool { @@ -390,12 +465,12 @@ impl CascadesOptimizer { } } -impl Optimizer for CascadesOptimizer { +impl> Optimizer for CascadesOptimizer { fn optimize(&mut self, root_rel: RelNodeRef) -> Result> { self.optimize_inner(root_rel) } - fn get_property>(&self, root_rel: RelNodeRef, idx: usize) -> P::Prop { - self.get_property_by_group::

(self.resolve_group_id(root_rel), idx) + fn get_property>(&self, root_rel: RelNodeRef, idx: usize) -> PB::Prop { + self.get_property_by_group::(self.resolve_group_id(root_rel), idx) } } diff --git a/optd-core/src/cascades/tasks.rs b/optd-core/src/cascades/tasks.rs index 2b87da10..09b8f804 100644 --- a/optd-core/src/cascades/tasks.rs +++ b/optd-core/src/cascades/tasks.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use crate::rel_node::RelNodeTyp; +use crate::{physical_prop::PhysicalPropsBuilder, rel_node::RelNodeTyp}; use super::CascadesOptimizer; @@ -16,8 +16,8 @@ pub use optimize_expression::OptimizeExpressionTask; pub use optimize_group::OptimizeGroupTask; pub use optimize_inputs::OptimizeInputsTask; -pub trait Task: 'static + Send + Sync { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>>; +pub trait Task>: 'static + Send + Sync { + fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>>; fn as_any(&self) -> &dyn std::any::Any; fn describe(&self) -> String; } diff --git a/optd-core/src/cascades/tasks/apply_rule.rs b/optd-core/src/cascades/tasks/apply_rule.rs index d73e0bdc..00ccc8dc 100644 --- a/optd-core/src/cascades/tasks/apply_rule.rs +++ b/optd-core/src/cascades/tasks/apply_rule.rs @@ -9,36 +9,50 @@ use crate::{ memo::RelMemoNodeRef, optimizer::{CascadesOptimizer, ExprId, RuleId}, tasks::{OptimizeExpressionTask, OptimizeInputsTask}, - GroupId, + GroupId, SubGroupId }, rel_node::{RelNode, RelNodeTyp}, rules::{OptimizeType, RuleMatcher}, + physical_prop::PhysicalPropsBuilder, }; use super::Task; -pub struct ApplyRuleTask { +/// ApplyRuleTask calls +/// 1. OptimizeExpression task for newly generate logical expr (logical->logical rule) +/// 2. OptimizeInputsTask for newly generate physical expr (logical->physical rule) +/// All the newly generated exprs are added to the default sub group +/// For required physical properties, it passes them to OptimizeInputsTask and OptimizeExpressionTask +/// +/// As we only move certain exprs to specific sub groups and rewrite their children in OptimizeInputsTask, +/// we don't care children node points to special subgroup in ApplyRuleTask, therefore the matcher +/// in ApplyRuleTask tries to match all the exprs in default sub group +pub struct ApplyRuleTask> { rule_id: RuleId, expr_id: ExprId, exploring: bool, + physical_props_builder: Arc

, + required_physical_props: P::PhysicalProps, } -impl ApplyRuleTask { - pub fn new(rule_id: RuleId, expr_id: ExprId, exploring: bool) -> Self { +impl> ApplyRuleTask { + pub fn new(rule_id: RuleId, expr_id: ExprId, exploring: bool, physical_props_builder: Arc

, required_physical_props: P::PhysicalProps) -> Self { Self { rule_id, expr_id, exploring, + physical_props_builder, + required_physical_props, } } } -fn match_node( +fn match_node>( typ: &T, children: &[RuleMatcher], pick_to: Option, node: RelMemoNodeRef, - optimizer: &CascadesOptimizer, + optimizer: &CascadesOptimizer, ) -> Vec>> { if let RuleMatcher::PickMany { .. } | RuleMatcher::IgnoreMany = children.last().unwrap() { } else { @@ -60,7 +74,8 @@ fn match_node( should_end = true; } RuleMatcher::PickOne { pick_to, expand } => { - let group_id = node.children[idx]; + assert!(node.children[idx].1 == SubGroupId(0), "can expr points to special subgroup happens here?"); + let group_id = node.children[idx].0; let node = if *expand { let mut exprs = optimizer.get_all_exprs_in_group(group_id); assert_eq!(exprs.len(), 1, "can only expand expression"); @@ -69,7 +84,7 @@ fn match_node( assert_eq!(bindings.len(), 1, "can only expand expression"); bindings.remove(0).as_ref().clone() } else { - RelNode::new_group(group_id) + RelNode::new_group(group_id, SubGroupId(0)) }; for pick in &mut picks { let res = pick.insert(*pick_to, node.clone()); @@ -83,7 +98,12 @@ fn match_node( RelNode::new_list( node.children[idx..] .iter() - .map(|x| Arc::new(RelNode::new_group(*x))) + .map(|x| + { + assert!(x.1 == SubGroupId(0), "can expr points to special subgroup happens here?"); + Arc::new(RelNode::new_group(x.0, SubGroupId(0))) + } + ) .collect_vec(), ), ); @@ -92,7 +112,7 @@ fn match_node( should_end = true; } _ => { - let new_picks = match_and_pick_group(child, node.children[idx], optimizer); + let new_picks = match_and_pick_group(child, node.children[idx].0, optimizer); let mut merged_picks = vec![]; for old_pick in &picks { for new_picks in &new_picks { @@ -114,7 +134,12 @@ fn match_node( children: node .children .iter() - .map(|x| RelNode::new_group(*x).into()) + .map(|x| + { + assert!(x.1 == SubGroupId(0), "can expr points to special subgroup happens here?"); + RelNode::new_group(x.0, SubGroupId(0)).into() + } + ) .collect_vec(), data: node.data.clone(), }, @@ -125,19 +150,19 @@ fn match_node( picks } -fn match_and_pick_expr( +fn match_and_pick_expr>( matcher: &RuleMatcher, expr_id: ExprId, - optimizer: &CascadesOptimizer, + optimizer: &CascadesOptimizer, ) -> Vec>> { let node = optimizer.get_expr_memoed(expr_id); match_and_pick(matcher, node, optimizer) } -fn match_and_pick_group( +fn match_and_pick_group>( matcher: &RuleMatcher, group_id: GroupId, - optimizer: &CascadesOptimizer, + optimizer: &CascadesOptimizer, ) -> Vec>> { let mut matches = vec![]; for expr_id in optimizer.get_all_exprs_in_group(group_id) { @@ -147,10 +172,10 @@ fn match_and_pick_group( matches } -fn match_and_pick( +fn match_and_pick>( matcher: &RuleMatcher, node: RelMemoNodeRef, - optimizer: &CascadesOptimizer, + optimizer: &CascadesOptimizer, ) -> Vec>> { match matcher { RuleMatcher::MatchAndPickNode { @@ -173,12 +198,12 @@ fn match_and_pick( } } -impl Task for ApplyRuleTask { +impl> Task for ApplyRuleTask { fn as_any(&self) -> &dyn std::any::Any { self } - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { + fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { if optimizer.is_rule_fired(self.expr_id, self.rule_id) { return Ok(vec![]); } @@ -236,8 +261,8 @@ impl Task for ApplyRuleTask { // the expr returned by heuristic rule is a brand new one // so there's no optimizeExpressionTask for it in the original task list // we should set exploring as false to both envoke tranform rule and impl rule for it - tasks.push(Box::new(OptimizeExpressionTask::new(self.expr_id, false)) - as Box>); + tasks.push(Box::new(OptimizeExpressionTask::new(self.expr_id, false, self.physical_props_builder.clone(), self.required_physical_props.clone())) + as Box>); } continue; } @@ -250,16 +275,16 @@ impl Task for ApplyRuleTask { continue; } let expr_typ = typ.clone(); - let (_, expr_id) = optimizer.add_group_expr(expr.into(), Some(group_id)); + let (_, expr_id) = optimizer.add_group_expr_to_default_sub_group(expr.into(), Some(group_id)); trace!(event = "apply_rule", expr_id = %self.expr_id, rule_id = %self.rule_id, new_expr_id = %expr_id); if expr_typ.is_logical() { tasks.push( - Box::new(OptimizeExpressionTask::new(expr_id, self.exploring)) - as Box>, + Box::new(OptimizeExpressionTask::new(expr_id, self.exploring, self.physical_props_builder.clone(), self.required_physical_props.clone())) + as Box>, ); } else { tasks - .push(Box::new(OptimizeInputsTask::new(expr_id, true)) as Box>); + .push(Box::new(OptimizeInputsTask::new(expr_id, true, self.physical_props_builder.clone(), self.required_physical_props.clone())) as Box>); } } } diff --git a/optd-core/src/cascades/tasks/explore_group.rs b/optd-core/src/cascades/tasks/explore_group.rs index abfad8f7..5968aeef 100644 --- a/optd-core/src/cascades/tasks/explore_group.rs +++ b/optd-core/src/cascades/tasks/explore_group.rs @@ -1,5 +1,6 @@ use anyhow::Result; use tracing::trace; +use std::sync::Arc; use crate::{ cascades::{ @@ -7,26 +8,32 @@ use crate::{ tasks::OptimizeExpressionTask, }, rel_node::RelNodeTyp, + physical_prop::PhysicalPropsBuilder, }; use super::Task; -pub struct ExploreGroupTask { +pub struct ExploreGroupTask> { group_id: GroupId, + physical_props_builder: Arc

, + required_physical_props: P::PhysicalProps, } -impl ExploreGroupTask { - pub fn new(group_id: GroupId) -> Self { - Self { group_id } +impl> ExploreGroupTask { + pub fn new(group_id: GroupId, physical_props_builder: Arc

, required_physical_props: P::PhysicalProps) -> Self { + if !physical_props_builder.is_any(&required_physical_props){ + unreachable!("ExploreGroupTask should not have any required physical properties") + } + Self { group_id, physical_props_builder, required_physical_props } } } -impl Task for ExploreGroupTask { +impl> Task for ExploreGroupTask { fn as_any(&self) -> &dyn std::any::Any { self } - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { + fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { trace!(event = "task_begin", task = "explore_group", group_id = %self.group_id); let mut tasks = vec![]; if optimizer.is_group_explored(self.group_id) { @@ -38,7 +45,7 @@ impl Task for ExploreGroupTask { for expr in exprs { let typ = optimizer.get_expr_memoed(expr).typ.clone(); if typ.is_logical() { - tasks.push(Box::new(OptimizeExpressionTask::new(expr, true)) as Box>); + tasks.push(Box::new(OptimizeExpressionTask::new(expr, true, self.physical_props_builder.clone(), self.required_physical_props.clone())) as Box>); } } optimizer.mark_group_explored(self.group_id); diff --git a/optd-core/src/cascades/tasks/optimize_expression.rs b/optd-core/src/cascades/tasks/optimize_expression.rs index 081dacf9..3c9106eb 100644 --- a/optd-core/src/cascades/tasks/optimize_expression.rs +++ b/optd-core/src/cascades/tasks/optimize_expression.rs @@ -1,5 +1,7 @@ use anyhow::Result; use tracing::trace; +use std::sync::Arc; + use crate::{ cascades::{ @@ -8,18 +10,27 @@ use crate::{ }, rel_node::{RelNodeTyp, Value}, rules::RuleMatcher, + physical_prop::PhysicalPropsBuilder }; use super::Task; -pub struct OptimizeExpressionTask { +/// OptimizeExpressionTask calls +/// 1. ExploreGroupTask for its children of the expr +/// 2. ApplyRuleTask for the expression +/// For required physical properties, it passes them to ApplyRuleTask +/// ExploreGroupTask does not need physical properties requirement, +/// as it is only for logical transformations +pub struct OptimizeExpressionTask> { expr_id: ExprId, exploring: bool, + physical_props_builder: Arc

, + required_physical_props: P::PhysicalProps, } -impl OptimizeExpressionTask { - pub fn new(expr_id: ExprId, exploring: bool) -> Self { - Self { expr_id, exploring } +impl> OptimizeExpressionTask { + pub fn new(expr_id: ExprId, exploring: bool, physical_props_builder: Arc

, required_physical_props: P::PhysicalProps) -> Self { + Self { expr_id, exploring, physical_props_builder, required_physical_props } } } @@ -35,12 +46,12 @@ fn top_matches( } } -impl Task for OptimizeExpressionTask { +impl> Task for OptimizeExpressionTask { fn as_any(&self) -> &dyn std::any::Any { self } - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { + fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { let expr = optimizer.get_expr_memoed(self.expr_id); trace!(event = "task_begin", task = "optimize_expr", expr_id = %self.expr_id, expr = %expr); let mut tasks = vec![]; @@ -59,11 +70,13 @@ impl Task for OptimizeExpressionTask { } if top_matches(rule.matcher(), expr.typ.clone(), expr.data.clone()) { tasks.push( - Box::new(ApplyRuleTask::new(rule_id, self.expr_id, self.exploring)) - as Box>, + Box::new(ApplyRuleTask::new(rule_id, self.expr_id, self.exploring, self.physical_props_builder.clone(), self.required_physical_props.clone())) + as Box>, ); for &input_group_id in &expr.children { - tasks.push(Box::new(ExploreGroupTask::new(input_group_id)) as Box>); + // Explore the whole group instead of the specigic SubGroup the expr children points to + // As explore task is for logical transformations + tasks.push(Box::new(ExploreGroupTask::new(input_group_id.0, self.physical_props_builder.clone(), self.physical_props_builder.any())) as Box>); } } } diff --git a/optd-core/src/cascades/tasks/optimize_group.rs b/optd-core/src/cascades/tasks/optimize_group.rs index 14c3330f..a365ab4f 100644 --- a/optd-core/src/cascades/tasks/optimize_group.rs +++ b/optd-core/src/cascades/tasks/optimize_group.rs @@ -1,5 +1,6 @@ use anyhow::Result; use tracing::trace; +use std::sync::Arc; use crate::{ cascades::{ @@ -7,46 +8,55 @@ use crate::{ tasks::{optimize_expression::OptimizeExpressionTask, OptimizeInputsTask}, CascadesOptimizer, }, + physical_prop::PhysicalPropsBuilder, rel_node::RelNodeTyp, }; use super::Task; -pub struct OptimizeGroupTask { +/// OptimizeGroupTask calls +/// 1. OptimizeExpressionTask for all logical expressions in the group +/// 2. OptimizeInputsTask for all physical expressions in the group +/// For required physical properties, it passes them to OptimizeInputTask and OptimizeExpressionTask +pub struct OptimizeGroupTask> { group_id: GroupId, + physical_props_builder: Arc

, + required_physical_props: P::PhysicalProps, } -impl OptimizeGroupTask { - pub fn new(group_id: GroupId) -> Self { - Self { group_id } +impl> OptimizeGroupTask { + pub fn new(group_id: GroupId, physical_props_builder: Arc

, required_physical_props: P::PhysicalProps) -> Self { + Self { group_id, physical_props_builder, required_physical_props } } } -impl Task for OptimizeGroupTask { +impl> Task for OptimizeGroupTask { fn as_any(&self) -> &dyn std::any::Any { self } - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { + fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { trace!(event = "task_begin", task = "optimize_group", group_id = %self.group_id); - let group_info = optimizer.get_group_info(self.group_id); - if group_info.winner.is_some() { + + let group_info = optimizer.get_sub_group_info_by_props(self.group_id, &self.required_physical_props); + if group_info.is_some() && group_info.unwrap().winner.is_some() { trace!(event = "task_finish", task = "optimize_group"); return Ok(vec![]); } - let exprs = optimizer.get_all_exprs_in_group(self.group_id); + let mut tasks = vec![]; + let exprs = optimizer.get_all_exprs_in_group(self.group_id); let exprs_cnt = exprs.len(); for &expr in &exprs { let typ = optimizer.get_expr_memoed(expr).typ.clone(); if typ.is_logical() { - tasks.push(Box::new(OptimizeExpressionTask::new(expr, false)) as Box>); + tasks.push(Box::new(OptimizeExpressionTask::new(expr, false, self.physical_props_builder.clone(), self.required_physical_props.clone())) as Box>); } } for &expr in &exprs { let typ = optimizer.get_expr_memoed(expr).typ.clone(); if !typ.is_logical() { - tasks.push(Box::new(OptimizeInputsTask::new(expr, true)) as Box>); + tasks.push(Box::new(OptimizeInputsTask::new(expr, true, self.physical_props_builder.clone(), self.required_physical_props.clone())) as Box>); } } trace!(event = "task_finish", task = "optimize_group", group_id = %self.group_id, exprs_cnt = exprs_cnt); diff --git a/optd-core/src/cascades/tasks/optimize_inputs.rs b/optd-core/src/cascades/tasks/optimize_inputs.rs index 7c84a28b..f2e8b811 100644 --- a/optd-core/src/cascades/tasks/optimize_inputs.rs +++ b/optd-core/src/cascades/tasks/optimize_inputs.rs @@ -1,15 +1,14 @@ use anyhow::Result; use tracing::trace; +use std::sync::Arc; use crate::{ cascades::{ - memo::{GroupInfo, Winner}, + memo::{RelMemoNode, SubGroupInfo, Winner}, optimizer::ExprId, tasks::OptimizeGroupTask, - CascadesOptimizer, GroupId, RelNodeContext, - }, - cost::Cost, - rel_node::RelNodeTyp, + CascadesOptimizer, GroupId, RelNodeContext, SubGroupId + }, cost::Cost, physical_prop::PhysicalPropsBuilder, rel_node::RelNodeTyp }; use super::Task; @@ -21,18 +20,37 @@ struct ContinueTask { return_from_optimize_group: bool, } -pub struct OptimizeInputsTask { +/// OptimizeInputsTask calls OptimizeGroupTask for each child of the current expression. +/// It is the only task that move expressions to sub groups from the default subgroup. +/// +/// If there's no required physical props(PhysicalProps::Any), it only updates the winner +/// in the default subgroup. +/// +/// If there's required physical props, +/// 1. After pass child physical properties, it update the winner in the default sub group first +/// 2. it then create the counterpart expr which satisfy required physical props, and move it to subgroup +pub struct OptimizeInputsTask> { expr_id: ExprId, continue_from: Option, pruning: bool, + physical_props_builder: Arc

, + required_physical_props: P::PhysicalProps, + required_children_props: Option>, + pass_to_children_props: Option, + required_enforce_props: Option, } -impl OptimizeInputsTask { - pub fn new(expr_id: ExprId, pruning: bool) -> Self { +impl> OptimizeInputsTask { + pub fn new(expr_id: ExprId, pruning: bool, physical_props_builder: Arc

, required_physical_props: P::PhysicalProps) -> Self { Self { expr_id, continue_from: None, pruning, + physical_props_builder, + required_physical_props, + required_children_props: None, + pass_to_children_props: None, + required_enforce_props: None } } @@ -41,24 +59,35 @@ impl OptimizeInputsTask { expr_id: self.expr_id, continue_from: Some(cont), pruning, + physical_props_builder: self.physical_props_builder.clone(), + required_physical_props: self.required_physical_props.clone(), + required_children_props: self.required_children_props.clone(), + pass_to_children_props: self.pass_to_children_props.clone(), + required_enforce_props: self.required_enforce_props.clone() } } /// first invoke of this task, compute the cost of children - fn first_invoke( + fn first_invoke( &self, - children: &[GroupId], - optimizer: &mut CascadesOptimizer, + children: &[(GroupId, SubGroupId)], + required_children_props: &Vec, + optimizer: &mut CascadesOptimizer, ) -> Vec { let zero_cost = optimizer.cost().zero(); let mut input_cost = Vec::with_capacity(children.len()); - for &child in children.iter() { - let group = optimizer.get_group_info(child); - if let Some(ref winner) = group.winner { - if !winner.impossible { - // the full winner case - input_cost.push(winner.cost.clone()); - continue; + for (&child, &ref prop) in children.iter().zip(required_children_props.iter()) { + // when optimize input task is first invoked, all the children are in default subgroup + assert!(child.1 == SubGroupId(0)); + let group = optimizer.get_sub_group_info_by_props(child.0, &prop); + if let Some(group) = group { + if group.winner.is_some() { + let winner = group.winner.unwrap(); + if !winner.impossible { + // the full winner case + input_cost.push(winner.cost.clone()); + continue; + } } } input_cost.push(zero_cost.clone()); @@ -86,10 +115,12 @@ impl OptimizeInputsTask { false } - fn update_winner( + fn update_winner( &self, cost_so_far: &Cost, - optimizer: &mut CascadesOptimizer, + optimizer: &mut CascadesOptimizer, + physical_prop: Option, + expr_id: Option, ) { let group_id = optimizer.get_group_id(self.expr_id); let group_info = optimizer.get_group_info(group_id); @@ -102,9 +133,25 @@ impl OptimizeInputsTask { update_cost = true; } if update_cost { + if physical_prop.is_some() { + optimizer.update_sub_group_info( + group_id, + expr_id, + SubGroupInfo { + winner: Some(Winner { + impossible: false, + expr_id: expr_id.unwrap(), + cost: cost_so_far.clone(), + }), + }, + &physical_prop.unwrap(), + ); + return; + } + optimizer.update_group_info( group_id, - GroupInfo { + SubGroupInfo { winner: Some(Winner { impossible: false, expr_id: self.expr_id, @@ -114,19 +161,45 @@ impl OptimizeInputsTask { ); } } + + fn create_counterpart_expr(&self, optimizer: &mut CascadesOptimizer, expr: Arc>) -> ExprId{ + let children_group_ids = &expr.children; + let mut changed = false; + let mut new_children_group_ids = Vec::with_capacity(children_group_ids.len()); + for (group_id, required_props) in children_group_ids.iter().zip(self.required_children_props.clone().unwrap().iter()){ + let group_id = group_id.0; + let sub_group_id = optimizer.get_sub_group_id(group_id, &required_props).unwrap(); + if sub_group_id.0 != 0{ + changed = true; + } + new_children_group_ids.push((group_id, sub_group_id)); + } + if changed { + let new_expr = RelMemoNode { + typ: expr.typ.clone(), + data: expr.data.clone(), + children: new_children_group_ids, + }; + let group_id = optimizer.get_group_id(self.expr_id); + // add new expr to sub group + return optimizer.add_sub_group_expr(new_expr, group_id, self.pass_to_children_props.as_ref().unwrap()); + } + self.expr_id + } } -impl Task for OptimizeInputsTask { +impl> Task for OptimizeInputsTask { fn as_any(&self) -> &dyn std::any::Any { self } - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { + fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { if optimizer.tasks.iter().any(|t| { if let Some(task) = t.as_any().downcast_ref::() { - // skip optimize_inputs to avoid dead-loop: consider join commute being fired twice that produces - // two projections, therefore having groups like projection1 -> projection2 -> join = projection1. - task.expr_id == self.expr_id + task.expr_id == self.expr_id + && task.required_physical_props == self.required_physical_props + && task.required_children_props == self.required_children_props + && task.required_enforce_props == self.required_enforce_props } else { false } @@ -169,30 +242,37 @@ impl Task for OptimizeInputsTask { return Ok(vec![]); } if next_group_idx < children_group_ids.len() { - let group_id = children_group_ids[next_group_idx]; + // all the expr for OptimizeInputTask are come from default subgroup, their children point to default sub group id + // we don't need the children subgroup id then + // instead, we use the required_children_props to get the children sub group info + let group_id = children_group_ids[next_group_idx].0; let group_idx = next_group_idx; - let group_info = optimizer.get_group_info(group_id); + let required_child_physical_props = &>::PhysicalProps>> as Clone>::clone(&self.required_children_props).unwrap()[group_idx]; + let sub_group_info = optimizer.get_sub_group_info_by_props(group_id, &required_child_physical_props); let mut has_full_winner = false; - if let Some(ref winner) = group_info.winner { - if !winner.impossible { - input_cost[group_idx] = winner.cost.clone(); - has_full_winner = true; - if self.should_terminate( - cost.sum( - &cost.compute_cost( - &expr.typ, - &expr.data, + if let Some(sub_group_info) = sub_group_info { + if sub_group_info.winner.is_some(){ + let winner = sub_group_info.winner.unwrap(); + if !winner.impossible { + input_cost[group_idx] = winner.cost.clone(); + has_full_winner = true; + if self.should_terminate( + cost.sum( + &cost.compute_cost( + &expr.typ, + &expr.data, + &input_cost, + Some(context.clone()), + Some(optimizer), + ), &input_cost, - Some(context.clone()), - Some(optimizer), - ), - &input_cost, - ) - .0[0], - optimizer.ctx.upper_bound, - ) { - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); - return Ok(vec![]); + ) + .0[0], + optimizer.ctx.upper_bound, + ) { + trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); + return Ok(vec![]); + } } } } @@ -207,33 +287,41 @@ impl Task for OptimizeInputsTask { return_from_optimize_group: true, }, self.pruning, - )) as Box>, - Box::new(OptimizeGroupTask::new(group_id)) as Box>, + )) as Box>, + Box::new(OptimizeGroupTask::new(group_id, self.physical_props_builder.clone(), required_child_physical_props.clone())) as Box>, ]); } else { - if let Some(ref winner) = group_info.winner { - if winner.impossible { - optimizer.update_group_info( - group_id, - GroupInfo { - winner: Some(Winner { - impossible: true, - ..Default::default() - }), - }, - ); - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); - return Ok(vec![]); + let sub_group_info = optimizer.get_sub_group_info_by_props(group_id, &required_child_physical_props); + if let Some(sub_group_info) = sub_group_info{ + if sub_group_info.winner.is_some(){ + let winner = sub_group_info.winner.unwrap(); + if winner.impossible { + optimizer.update_sub_group_info( + group_id, + None, // No need to add this expr to the subgroup, as this expr cannot provide required physical props + SubGroupInfo { + winner: Some(Winner { + impossible: true, + ..Default::default() + }), + }, + required_child_physical_props, + ); + trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); + return Ok(vec![]); + } } } - optimizer.update_group_info( + optimizer.update_sub_group_info( group_id, - GroupInfo { + None, + SubGroupInfo { winner: Some(Winner { impossible: true, ..Default::default() }), }, + required_child_physical_props, ); trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); return Ok(vec![]); @@ -247,35 +335,142 @@ impl Task for OptimizeInputsTask { return_from_optimize_group: false, }, self.pruning, - )) as Box>]) + )) as Box>]) } else { - self.update_winner( - &cost.sum( + let cost_so_far = cost.sum( &cost.compute_cost( - &expr.typ, - &expr.data, + &expr.typ, + &expr.data, &input_cost, Some(context.clone()), Some(optimizer), ), &input_cost, - ), + ); + // 1. finish optimizing all the children, let's update the winner for the default sub group first + self.update_winner( + &cost_so_far, optimizer, + None, + None ); + + // 2. create counterpart expr based on required child physical prop + // which will create a sub group with pass_to_child_props in current group + // having children sub groups satifying required children physical prop + let mut sub_group_id = SubGroupId(0); + if self.pass_to_children_props.is_some(){ + let pass_to_children_props = self.pass_to_children_props.clone().unwrap(); + if !self.physical_props_builder.is_any(&pass_to_children_props){ + let counterpart_expr_id = self.create_counterpart_expr(optimizer, expr); + sub_group_id = optimizer.get_sub_group_id(group_id, &pass_to_children_props).unwrap(); + self.update_winner( + &cost_so_far, + optimizer, + Some(pass_to_children_props), + Some(counterpart_expr_id) + ); + } + } + + // 3. start enforcer task to enforce the required physical props + if self.required_enforce_props.is_some() { + let required_enforcer_props = self.required_enforce_props.clone().unwrap(); + if !self.physical_props_builder.is_any(&required_enforcer_props){ + // enforce start enforce operator based on the winner of (group_id, sub_group_id) + let winner_info = optimizer.get_sub_group_info_by_id(group_id, sub_group_id).winner.unwrap(); + let winner_expr_id = winner_info.expr_id; + let winner_cost = winner_info.cost; + + let winner_expr = optimizer.get_all_expr_bindings(winner_expr_id, Some(0)); + assert!(winner_expr.len() == 1); + let winner_expr = winner_expr.get(0).unwrap(); + + // TODO: we might need to add a match and pick here to create RelNodeRef based on RelMemoNodeRef + let new_expr = self.physical_props_builder.enforce(winner_expr.clone(), &required_enforcer_props); + + let enforcer_cost = cost.sum(&cost.compute_cost( + &new_expr.typ, + &new_expr.data, + &[winner_cost.clone()], + Some(context.clone()), + Some(optimizer), + ), + &[winner_cost]); + + let new_expr_memo = RelMemoNode { + typ: new_expr.typ.clone(), + data: new_expr.data.clone(), + children: vec![(group_id, sub_group_id)], + }; + // here we use required_physical_props because the base expr provides the pass_to_children_props and enforcer provides the required_enforce_props + // they together provides the required_physical_props + let new_expr_id = optimizer.add_sub_group_expr(new_expr_memo, group_id, &self.required_physical_props); + self.update_winner( + &enforcer_cost, + optimizer, + Some(self.required_physical_props.clone()), + Some(new_expr_id) + ); + } + } + trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); Ok(vec![]) } } else { - let input_cost = self.first_invoke(children_group_ids, optimizer); + // 1. if there's no required physical props, we make pass_to_children_props as any and required_enforce_props as any + if self.physical_props_builder.is_any(&self.required_physical_props){ + let pass_to_children_props = Some(self.physical_props_builder.any()); + let required_enforce_props = Some(self.physical_props_builder.any()); + let required_children_props = vec![self.physical_props_builder.any(); children_group_ids.len()]; + let input_cost = self.first_invoke(children_group_ids, &required_children_props, optimizer); + trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id); + return Ok(vec![Box::new(OptimizeInputsTask::{ + expr_id: self.expr_id, + continue_from: Some(ContinueTask { + next_group_idx: 0, + input_cost, + return_from_optimize_group: false, + }), + pruning: self.pruning, + physical_props_builder: self.physical_props_builder.clone(), + required_physical_props: self.required_physical_props.clone(), + required_enforce_props: required_enforce_props, + required_children_props: Some(required_children_props), + pass_to_children_props: pass_to_children_props, + }) as Box>]); + } + + // separate the physical properties for the current expr gives us a vector of (pass_to_children_props, required_enforce_props, required_children_props) + // 1. for situation that current expr cannot provide any of the required physical props, we set others as any and put all required to required_enforce_props + // 2. for situation that expr can pass requirement to children, we separate required_props to pass_to_children_props and required_enforce_props + // 3. for situation that expr can provide the required physical props by its own(sort merge join to provide ordering), we set pass_to_children_props to any and required_enforce_props to any + let props = self.physical_props_builder.separate_physical_props(&expr.typ, &expr.data, &self.required_physical_props, children_group_ids.len()); + + let mut tasks = Vec::with_capacity(props.len()); + for (pass_to_children_props, required_enforce_prop, required_children_props) in props.into_iter(){ + let input_cost = self.first_invoke(children_group_ids, &required_children_props, optimizer); + tasks.push( + Box::new(OptimizeInputsTask::{ + expr_id: self.expr_id, + continue_from: Some(ContinueTask{ + next_group_idx: 0, + input_cost: input_cost, + return_from_optimize_group: false + }), + pruning: self.pruning, + physical_props_builder: self.physical_props_builder.clone(), + required_physical_props: self.required_physical_props.clone(), + required_enforce_props: Some(required_enforce_prop), + required_children_props: Some(required_children_props), + pass_to_children_props: Some(pass_to_children_props), // Add a semicolon here + }) as Box> + ); + } + trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id); - Ok(vec![Box::new(self.continue_from( - ContinueTask { - next_group_idx: 0, - input_cost, - return_from_optimize_group: false, - }, - self.pruning, - )) as Box>]) + Ok(tasks) } } diff --git a/optd-core/src/cost.rs b/optd-core/src/cost.rs index 8d387a61..d522e109 100644 --- a/optd-core/src/cost.rs +++ b/optd-core/src/cost.rs @@ -1,12 +1,13 @@ use crate::{ cascades::{CascadesOptimizer, RelNodeContext}, rel_node::{RelNode, RelNodeTyp, Value}, + physical_prop::PhysicalPropsBuilder, }; #[derive(Default, Clone, Debug, PartialOrd, PartialEq)] pub struct Cost(pub Vec); -pub trait CostModel: 'static + Send + Sync { +pub trait CostModel>: 'static + Send + Sync { fn compute_cost( &self, node: &T, @@ -14,7 +15,7 @@ pub trait CostModel: 'static + Send + Sync { children: &[Cost], context: Option, // one reason we need the optimizer is to traverse children nodes to build up an expression tree - optimizer: Option<&CascadesOptimizer>, + optimizer: Option<&CascadesOptimizer>, ) -> Cost; fn compute_plan_node_cost(&self, node: &RelNode) -> Cost; diff --git a/optd-core/src/lib.rs b/optd-core/src/lib.rs index 363539c2..d2bf9135 100644 --- a/optd-core/src/lib.rs +++ b/optd-core/src/lib.rs @@ -7,3 +7,4 @@ pub mod optimizer; pub mod property; pub mod rel_node; pub mod rules; +pub mod physical_prop; diff --git a/optd-core/src/physical_prop.rs b/optd-core/src/physical_prop.rs new file mode 100644 index 00000000..c93c8a08 --- /dev/null +++ b/optd-core/src/physical_prop.rs @@ -0,0 +1,48 @@ +use std::hash::Hash; +use std::cmp::{Eq, PartialEq}; +use std::fmt::Debug; +use crate::rel_node::{RelNodeTyp, Value, RelNodeRef}; + +pub trait PhysicalPropsBuilder: 'static + Send + Sync{ + + type PhysicalProps: 'static + Send + Sync + Sized + Clone + Debug + Eq + PartialEq + Hash; + + fn new() -> Self; + + fn names(&self, props: &Self::PhysicalProps) -> Vec<&'static str>; + + fn is_any(&self, props: &Self::PhysicalProps) -> bool; + + fn any(&self) -> Self::PhysicalProps; + + fn can_provide( + &self, + typ: &T, + data: &Option, + required: &Self::PhysicalProps + ) -> bool; + + fn build_children_properties( + &self, + typ: &T, + data: &Option, + children_len: usize, + required: &Self::PhysicalProps + ) -> Vec; + + fn enforce( + &self, + expr: RelNodeRef, + required: &Self::PhysicalProps + ) -> RelNodeRef; + + // separate physical properties to pass_to_children prop and enforcer prop + // pass_to_children prop are further separated to each child + fn separate_physical_props( + &self, + typ: &T, + data: &Option, + required: &Self::PhysicalProps, + children_len: usize, + ) -> Vec<(Self::PhysicalProps, Self::PhysicalProps, Vec)>; +} \ No newline at end of file diff --git a/optd-core/src/rel_node.rs b/optd-core/src/rel_node.rs index a140ebf7..d14ca39f 100644 --- a/optd-core/src/rel_node.rs +++ b/optd-core/src/rel_node.rs @@ -11,7 +11,7 @@ use std::{ use ordered_float::OrderedFloat; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use crate::{cascades::GroupId, cost::Cost}; +use crate::{cascades::{GroupId, SubGroupId}, cost::Cost}; pub type RelNodeRef = Arc>; @@ -20,10 +20,12 @@ pub trait RelNodeTyp: { fn is_logical(&self) -> bool; - fn group_typ(group_id: GroupId) -> Self; + fn group_typ(group_id: GroupId, sub_group_id: SubGroupId) -> Self; fn extract_group(&self) -> Option; + fn extract_group_and_sub_group(&self) -> Option<(GroupId, SubGroupId)>; + fn list_typ() -> Self; } @@ -222,8 +224,8 @@ impl RelNode { } } - pub fn new_group(group_id: GroupId) -> Self { - Self::new_leaf(T::group_typ(group_id)) + pub fn new_group(group_id: GroupId, sub_group_id: SubGroupId) -> Self { + Self::new_leaf(T::group_typ(group_id, sub_group_id)) } pub fn new_list(items: Vec>) -> Self { @@ -250,5 +252,6 @@ impl RelNodeMeta { } } +// TODO(avery): we might want to redesign this to make it align with Subgroups /// A hash table storing `RelNode` (memory address, metadata) pairs. pub type RelNodeMetaMap = HashMap; diff --git a/optd-datafusion-bridge/src/lib.rs b/optd-datafusion-bridge/src/lib.rs index e6413653..df0bf244 100644 --- a/optd-datafusion-bridge/src/lib.rs +++ b/optd-datafusion-bridge/src/lib.rs @@ -29,6 +29,7 @@ use std::{ collections::{BTreeSet, HashMap}, sync::{Arc, Mutex}, }; +use optd_core::cascades::SubGroupId; pub struct OptdPlanContext<'a> { tables: HashMap>, @@ -267,9 +268,10 @@ impl OptdQueryPlanner { "None".to_string() }, )); + // TODO(avery):use real required root props corresponding sub_group_id let bindings = optimizer .optd_cascades_optimizer() - .get_all_group_bindings(group_id, true); + .get_all_group_bindings(group_id, SubGroupId(0), true); let mut join_orders = BTreeSet::new(); let mut logical_join_orders = BTreeSet::new(); for binding in bindings { diff --git a/optd-datafusion-repr/src/bin/test_optimize.rs b/optd-datafusion-repr/src/bin/test_optimize.rs index eb7a80a1..728f69b4 100644 --- a/optd-datafusion-repr/src/bin/test_optimize.rs +++ b/optd-datafusion-repr/src/bin/test_optimize.rs @@ -5,15 +5,14 @@ use optd_core::{ heuristics::HeuristicsOptimizer, optimizer::Optimizer, rel_node::Value, + physical_prop::PhysicalPropsBuilder, rules::{Rule, RuleWrapper}, }; use optd_datafusion_repr::{ - cost::{base_cost::DataFusionPerTableStats, OptCostModel}, - plan_nodes::{ + cost::{base_cost::DataFusionPerTableStats, OptCostModel}, physical_properties::{self, PhysicalPropsBuilderImpl}, plan_nodes::{ BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, JoinType, LogicalFilter, LogicalJoin, LogicalScan, OptRelNode, OptRelNodeTyp, PlanNode, - }, - rules::{HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule}, + }, rules::{HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule} }; use tracing::Level; @@ -25,7 +24,7 @@ pub fn main() { .with_target(false) .init(); - let rules: Vec>>> = vec![ + let rules: Vec>>> = vec![ Arc::new(JoinCommuteRule::new()), Arc::new(JoinAssocRule::new()), Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Scan)), @@ -40,6 +39,8 @@ pub fn main() { rule_wrappers.push(RuleWrapper::new_cascades(rule)); } + let physical_properties_builder = PhysicalPropsBuilderImpl::new(); + let required_root_props = physical_properties_builder.any(); let mut optimizer = CascadesOptimizer::new( rule_wrappers, Box::new(OptCostModel::new( @@ -49,6 +50,8 @@ pub fn main() { .collect(), )), vec![], + Arc::new(physical_properties_builder), + required_root_props, ); // The plan: (filter (scan t1) #1=2) join (scan t2) join (scan t3) @@ -65,7 +68,7 @@ pub fn main() { let join_filter = LogicalJoin::new(filter1.0, scan2.0, join_cond.clone().0, JoinType::Inner); let fnal = LogicalJoin::new(scan3.0, join_filter.0, join_cond.0, JoinType::Inner); let node = optimizer.optimize(fnal.0.clone().into_rel_node()); - optimizer.dump(None); + optimizer.dump(None, None); let node: Arc> = node.unwrap(); println!( "cost={}", diff --git a/optd-datafusion-repr/src/cost/adaptive_cost.rs b/optd-datafusion-repr/src/cost/adaptive_cost.rs index dd29edc6..80e904b3 100644 --- a/optd-datafusion-repr/src/cost/adaptive_cost.rs +++ b/optd-datafusion-repr/src/cost/adaptive_cost.rs @@ -1,13 +1,10 @@ use std::{ - collections::HashMap, - sync::{Arc, Mutex}, + collections::HashMap, io::Empty, sync::{Arc, Mutex} }; -use crate::{cost::OptCostModel, plan_nodes::OptRelNodeTyp}; +use crate::{cost::OptCostModel, plan_nodes::OptRelNodeTyp, physical_properties::EmptyPhysicalPropsBuilder}; use optd_core::{ - cascades::{CascadesOptimizer, GroupId, RelNodeContext}, - cost::{Cost, CostModel}, - rel_node::{RelNode, Value}, + cascades::{CascadesOptimizer, GroupId, RelNodeContext}, cost::{Cost, CostModel}, physical_prop::PhysicalPropsBuilder, rel_node::{RelNode, Value} }; use super::base_cost::{ @@ -33,7 +30,7 @@ pub struct AdaptiveCostModel { decay: usize, } -impl CostModel for AdaptiveCostModel { +impl CostModel for AdaptiveCostModel { fn explain(&self, cost: &Cost) -> String { self.base_model.explain(cost) } @@ -52,7 +49,7 @@ impl CostModel for Adaptive data: &Option, children: &[Cost], context: Option, - optimizer: Option<&CascadesOptimizer>, + optimizer: Option<&CascadesOptimizer>, ) -> Cost { if let OptRelNodeTyp::PhysicalScan = node { let guard = self.runtime_row_cnt.lock().unwrap(); diff --git a/optd-datafusion-repr/src/cost/base_cost.rs b/optd-datafusion-repr/src/cost/base_cost.rs index 5fc0fb1c..2ff73094 100644 --- a/optd-datafusion-repr/src/cost/base_cost.rs +++ b/optd-datafusion-repr/src/cost/base_cost.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use crate::physical_properties::EmptyPhysicalPropsBuilder; use crate::plan_nodes::{ BinOpType, ColumnRefExpr, ConstantExpr, ConstantType, Expr, ExprList, LogOpExpr, LogOpType, OptRelNode, UnOpType, @@ -20,13 +21,14 @@ use optd_core::{ cascades::{CascadesOptimizer, RelNodeContext}, cost::{Cost, CostModel}, rel_node::{RelNode, RelNodeTyp, Value}, + physical_prop::PhysicalPropsBuilder }; use optd_gungnir::stats::hyperloglog::{self, HyperLogLog}; use optd_gungnir::stats::tdigest::{self, TDigest}; use optd_gungnir::utils::arith_encoder; use serde::{Deserialize, Serialize}; -fn compute_plan_node_cost>( +fn compute_plan_node_cost, C: CostModel>( model: &C, node: &RelNode, total_cost: &mut Cost, @@ -378,7 +380,7 @@ impl OptCostModel { } } -impl CostModel for OptCostModel { +impl CostModel for OptCostModel { fn explain(&self, cost: &Cost) -> String { format!( "weighted={},row_cnt={},compute={},io={}", @@ -410,7 +412,7 @@ impl CostModel for OptCostM data: &Option, children: &[Cost], context: Option, - optimizer: Option<&CascadesOptimizer>, + optimizer: Option<&CascadesOptimizer>, ) -> Cost { match node { OptRelNodeTyp::PhysicalScan => { @@ -423,7 +425,7 @@ impl CostModel for OptCostM let (row_cnt, compute_cost, _) = Self::cost_tuple(&children[0]); let row_cnt = if let (Some(context), Some(optimizer)) = (context, optimizer) { let mut fetch_expr = - optimizer.get_all_group_bindings(context.children_group_ids[2], false); + optimizer.get_all_group_bindings(context.children_group_ids[2].0, context.children_group_ids[2].1, false); assert!( fetch_expr.len() == 1, "fetch expression should be the only expr in the group" @@ -458,7 +460,7 @@ impl CostModel for OptCostM let column_refs = optimizer .get_property_by_group::(context.group_id, 1); let expr_group_id = context.children_group_ids[1]; - let expr_trees = optimizer.get_all_group_bindings(expr_group_id, false); + let expr_trees = optimizer.get_all_group_bindings(expr_group_id.0, expr_group_id.1, false); // there may be more than one expression tree in a group (you can see this trivially as you can just swap the order of two subtrees for commutative operators) // however, we just take an arbitrary expression tree from the group to compute selectivity let expr_tree = expr_trees.first().expect("expression missing"); @@ -480,7 +482,7 @@ impl CostModel for OptCostM let column_refs = optimizer .get_property_by_group::(context.group_id, 1); let expr_group_id = context.children_group_ids[2]; - let expr_trees = optimizer.get_all_group_bindings(expr_group_id, false); + let expr_trees = optimizer.get_all_group_bindings(expr_group_id.0, expr_group_id.1, false); // there may be more than one expression tree in a group. see comment in OptRelNodeTyp::PhysicalFilter(_) for more information let expr_tree = expr_trees.first().expect("expression missing"); self.get_join_selectivity_from_expr_tree( @@ -513,9 +515,9 @@ impl CostModel for OptCostM let left_keys_group_id = context.children_group_ids[2]; let right_keys_group_id = context.children_group_ids[3]; let left_keys_list = - optimizer.get_all_group_bindings(left_keys_group_id, false); + optimizer.get_all_group_bindings(left_keys_group_id.0, left_keys_group_id.1, false); let right_keys_list = - optimizer.get_all_group_bindings(right_keys_group_id, false); + optimizer.get_all_group_bindings(right_keys_group_id.0, right_keys_group_id.1, false); // there may be more than one expression tree in a group. see comment in OptRelNodeTyp::PhysicalFilter(_) for more information let left_keys = left_keys_list.first().expect("left keys missing"); let right_keys = right_keys_list.first().expect("right keys missing"); @@ -597,13 +599,13 @@ impl OptCostModel { fn get_agg_row_cnt( &self, context: Option, - optimizer: Option<&CascadesOptimizer>, + optimizer: Option<&CascadesOptimizer>, child_row_cnt: f64, ) -> f64 { if let (Some(context), Some(optimizer)) = (context, optimizer) { let group_by_id = context.children_group_ids[2]; let mut group_by_exprs: Vec>> = - optimizer.get_all_group_bindings(group_by_id, false); + optimizer.get_all_group_bindings(group_by_id.0, group_by_id.1, false); assert!( group_by_exprs.len() == 1, "ExprList expression should be the only expression in the GROUP BY group" diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index 131015e2..1b8cee36 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -8,14 +8,16 @@ use cost::{ AdaptiveCostModel, BaseTableStats, RuntimeAdaptionStorage, DEFAULT_DECAY, }; use optd_core::{ - cascades::{CascadesOptimizer, GroupId, OptimizerProperties}, + cascades::{CascadesOptimizer, GroupId, SubGroupId, OptimizerProperties}, heuristics::{ApplyOrder, HeuristicsOptimizer}, optimizer::Optimizer, property::PropertyBuilderAny, + physical_prop::PhysicalPropsBuilder, rel_node::RelNodeMetaMap, rules::{Rule, RuleWrapper}, }; +use physical_properties::PhysicalPropsBuilderImpl; use plan_nodes::{OptRelNodeRef, OptRelNodeTyp}; use properties::{ column_ref::ColumnRefPropertyBuilder, @@ -34,10 +36,11 @@ mod explain; pub mod plan_nodes; pub mod properties; pub mod rules; +pub mod physical_properties; pub struct DatafusionOptimizer { hueristic_optimizer: HeuristicsOptimizer, - cascades_optimizer: CascadesOptimizer, + cascades_optimizer: CascadesOptimizer, pub runtime_statistics: RuntimeAdaptionStorage, enable_adaptive: bool, enable_heuristic: bool, @@ -60,7 +63,7 @@ impl DatafusionOptimizer { self.enable_heuristic } - pub fn optd_cascades_optimizer(&self) -> &CascadesOptimizer { + pub fn optd_cascades_optimizer(&self) -> &CascadesOptimizer { &self.cascades_optimizer } @@ -68,7 +71,7 @@ impl DatafusionOptimizer { &self.hueristic_optimizer } - pub fn optd_optimizer_mut(&mut self) -> &mut CascadesOptimizer { + pub fn optd_optimizer_mut(&mut self) -> &mut CascadesOptimizer { &mut self.cascades_optimizer } @@ -86,7 +89,7 @@ impl DatafusionOptimizer { } pub fn default_cascades_rules( - ) -> Vec>>> { + ) -> Vec>>> { let rules = PhysicalConversionRule::all_conversions(); let mut rule_wrappers = vec![]; for rule in rules { @@ -114,6 +117,9 @@ impl DatafusionOptimizer { Box::new(SchemaPropertyBuilder::new(catalog.clone())), Box::new(ColumnRefPropertyBuilder::new(catalog.clone())), ]); + let physical_properties_builder = PhysicalPropsBuilderImpl::new(); + // TODO(avery): add real required_root_props + let required_root_props = physical_properties_builder.any(); let cost_model = AdaptiveCostModel::new(DEFAULT_DECAY, stats); Self { runtime_statistics: cost_model.get_runtime_map(), @@ -124,6 +130,8 @@ impl DatafusionOptimizer { Box::new(SchemaPropertyBuilder::new(catalog.clone())), Box::new(ColumnRefPropertyBuilder::new(catalog.clone())), ], + Arc::new(physical_properties_builder), + required_root_props, OptimizerProperties { partial_explore_iter: Some(1 << 20), partial_explore_space: Some(1 << 10), @@ -170,6 +178,8 @@ impl DatafusionOptimizer { Box::new(SchemaPropertyBuilder::new(catalog.clone())), Box::new(ColumnRefPropertyBuilder::new(catalog)), ], + Arc::new(PhysicalPropsBuilderImpl::new()), + PhysicalPropsBuilderImpl::new().any(), ); Self { runtime_statistics, @@ -204,14 +214,15 @@ impl DatafusionOptimizer { let group_id = self.cascades_optimizer.step_optimize_rel(root_rel)?; let mut meta = Some(HashMap::new()); + // TODO(avery): use real required_root_props let optimized_rel = self .cascades_optimizer - .step_get_optimize_rel(group_id, &mut meta)?; + .step_get_optimize_rel(group_id, PhysicalPropsBuilderImpl::new().any(), &mut meta)?; Ok((group_id, optimized_rel, meta.unwrap())) } - pub fn dump(&self, group_id: Option) { - self.cascades_optimizer.dump(group_id) + pub fn dump(&self, group_id: Option, sub_group_id: Option) { + self.cascades_optimizer.dump(group_id, sub_group_id); } } diff --git a/optd-datafusion-repr/src/physical_properties.rs b/optd-datafusion-repr/src/physical_properties.rs new file mode 100644 index 00000000..766dd5f0 --- /dev/null +++ b/optd-datafusion-repr/src/physical_properties.rs @@ -0,0 +1,84 @@ +use optd_core::{ + rel_node::{RelNodeRef, Value}, + physical_prop::PhysicalPropsBuilder +}; +use crate::OptRelNodeTyp; + +// Use this type to choose the physical properties that are registered for the optimizer +pub type PhysicalPropsBuilderImpl = EmptyPhysicalPropsBuilder; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum EmptyPhysicalPropState{ + Empty, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct EmptyPhysicalProps(EmptyPhysicalPropState); + +pub struct EmptyPhysicalPropsBuilder{ + state: EmptyPhysicalPropState, +} + +impl PhysicalPropsBuilder for EmptyPhysicalPropsBuilder{ + type PhysicalProps = EmptyPhysicalProps; + + fn new () -> Self{ + EmptyPhysicalPropsBuilder{ + state: EmptyPhysicalPropState::Empty + } + } + + fn names(&self, props: &Self::PhysicalProps) -> Vec<&'static str>{ + vec!["EmptyPhysicalProps"] + } + + fn is_any(&self, props: &Self::PhysicalProps) -> bool{ + match props.0{ + EmptyPhysicalPropState::Empty => true, + } + } + + fn any(&self) -> Self::PhysicalProps{ + EmptyPhysicalProps(EmptyPhysicalPropState::Empty) + } + + fn can_provide( + &self, + typ: &OptRelNodeTyp, + data: &Option, + required: &Self::PhysicalProps + ) -> bool{ + self.is_any(required) + } + + fn build_children_properties( + &self, + typ: &OptRelNodeTyp, + data: &Option, + children_len: usize, + required: &Self::PhysicalProps + ) -> Vec{ + vec![self.any(); children_len] + } + + fn enforce( + &self, + expr: RelNodeRef, + required: &Self::PhysicalProps + ) -> RelNodeRef{ + expr + } + + fn separate_physical_props( + &self, + typ: &OptRelNodeTyp, + data: &Option, + required: &Self::PhysicalProps, + children_len: usize, + ) -> Vec<(Self::PhysicalProps, Self::PhysicalProps, Vec)>{ + let pass_to_children = self.any(); + let enforcer = self.any(); + let children_props = vec![self.any(); children_len]; + vec![(pass_to_children, enforcer, children_props)] + } +} \ No newline at end of file diff --git a/optd-datafusion-repr/src/plan_nodes.rs b/optd-datafusion-repr/src/plan_nodes.rs index e872b3a9..a3d0cfb1 100644 --- a/optd-datafusion-repr/src/plan_nodes.rs +++ b/optd-datafusion-repr/src/plan_nodes.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use arrow_schema::DataType; use optd_core::{ - cascades::{CascadesOptimizer, GroupId}, + cascades::{CascadesOptimizer, GroupId, SubGroupId}, rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp}, }; @@ -37,13 +37,13 @@ pub use projection::{LogicalProjection, PhysicalProjection}; pub use scan::{LogicalScan, PhysicalScan}; pub use sort::{LogicalSort, PhysicalSort}; -use crate::properties::schema::{Schema, SchemaPropertyBuilder}; +use crate::{physical_properties::PhysicalPropsBuilderImpl, properties::schema::{Schema, SchemaPropertyBuilder}}; /// OptRelNodeTyp FAQ: /// - The define_plan_node!() macro defines what the children of each join node are #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum OptRelNodeTyp { - Placeholder(GroupId), + Placeholder(GroupId, SubGroupId), List, // Plan nodes // Developers: update `is_plan_node` function after adding new elements @@ -147,8 +147,16 @@ impl RelNodeTyp for OptRelNodeTyp { ) } - fn group_typ(group_id: GroupId) -> Self { - Self::Placeholder(group_id) + fn group_typ(group_id: GroupId, sub_group_id: SubGroupId) -> Self { + Self::Placeholder(group_id, sub_group_id) + } + + fn extract_group_and_sub_group(&self) -> Option<(GroupId, SubGroupId)> { + if let Self::Placeholder(group_id, sub_group_id) = self { + Some((*group_id, *sub_group_id)) + } else { + None + } } fn list_typ() -> Self { @@ -156,7 +164,7 @@ impl RelNodeTyp for OptRelNodeTyp { } fn extract_group(&self) -> Option { - if let Self::Placeholder(group_id) = self { + if let Self::Placeholder(group_id, _) = self { Some(*group_id) } else { None @@ -229,7 +237,7 @@ impl PlanNode { self.0.typ.clone() } - pub fn schema(&self, optimizer: &CascadesOptimizer) -> Schema { + pub fn schema(&self, optimizer: &CascadesOptimizer) -> Schema { let group_id = optimizer.resolve_group_id(self.0.clone()); optimizer.get_property_by_group::(group_id, 0) } @@ -356,7 +364,7 @@ pub fn explain(rel_node: OptRelNodeRef, meta_map: Option<&RelNodeMetaMap>) -> Pr OptRelNodeTyp::PhysicalNestedLoopJoin(_) => PhysicalNestedLoopJoin::from_rel_node(rel_node) .unwrap() .dispatch_explain(meta_map), - OptRelNodeTyp::Placeholder(_) => unreachable!("should not explain a placeholder"), + OptRelNodeTyp::Placeholder(_, _) => unreachable!("should not explain a placeholder"), OptRelNodeTyp::List => { ExprList::from_rel_node(rel_node) // ExprList is the only place that we will have list in the datafusion repr .unwrap()