Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

[WIP] Physical props #173

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion optd-core/src/cascades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod optimizer;
mod tasks;

use memo::Memo;
pub use optimizer::{CascadesOptimizer, GroupId, OptimizerProperties, RelNodeContext};
pub use optimizer::{CascadesOptimizer, GroupId, SubGroupId, OptimizerProperties, RelNodeContext};
use tasks::Task;
346 changes: 297 additions & 49 deletions optd-core/src/cascades/memo.rs

Large diffs are not rendered by default.

161 changes: 118 additions & 43 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ use std::{
use anyhow::Result;

use crate::{
cost::CostModel,
optimizer::Optimizer,
property::{PropertyBuilder, PropertyBuilderAny},
rel_node::{RelNodeMetaMap, RelNodeRef, RelNodeTyp},
rules::RuleWrapper,
cost::CostModel, optimizer::Optimizer,
physical_prop::PhysicalPropsBuilder,
property::{PropertyBuilder, PropertyBuilderAny}, rel_node::{RelNodeMetaMap, RelNodeRef, RelNodeTyp}, rules::RuleWrapper
};

use super::{
memo::{GroupInfo, RelMemoNodeRef},
memo::{SubGroupInfo, RelMemoNodeRef, RelMemoNode},
tasks::OptimizeGroupTask,
Memo, Task,
};
Expand All @@ -37,15 +35,17 @@ pub struct OptimizerProperties {
pub partial_explore_space: Option<usize>,
}

pub struct CascadesOptimizer<T: RelNodeTyp> {
memo: Memo<T>,
pub(super) tasks: VecDeque<Box<dyn Task<T>>>,
pub struct CascadesOptimizer<T: RelNodeTyp, P: PhysicalPropsBuilder<T>> {
memo: Memo<T, P>,
pub(super) tasks: VecDeque<Box<dyn Task<T,P>>>,
explored_group: HashSet<GroupId>,
fired_rules: HashMap<ExprId, HashSet<RuleId>>,
rules: Arc<[Arc<RuleWrapper<T, Self>>]>,
disabled_rules: HashSet<usize>,
cost: Arc<dyn CostModel<T>>,
cost: Arc<dyn CostModel<T, P>>,
property_builders: Arc<[Box<dyn PropertyBuilderAny<T>>]>,
required_root_props: P::PhysicalProps,
physical_property_builders: Arc<P>,
pub ctx: OptimizerContext,
pub prop: OptimizerProperties,
}
Expand All @@ -56,12 +56,15 @@ pub struct CascadesOptimizer<T: RelNodeTyp> {
pub struct RelNodeContext {
pub group_id: GroupId,
pub expr_id: ExprId,
pub children_group_ids: Vec<GroupId>,
pub children_group_ids: Vec<(GroupId, SubGroupId)>,
}

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)]
pub struct GroupId(pub(super) usize);

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)]
pub struct SubGroupId(pub usize);

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)]
pub struct ExprId(pub usize);

Expand All @@ -71,30 +74,40 @@ impl Display for GroupId {
}
}

impl Display for SubGroupId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "!{}", self.0)
}
}

impl Display for ExprId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl<T: RelNodeTyp> CascadesOptimizer<T> {
impl<T: RelNodeTyp, P: PhysicalPropsBuilder<T>> CascadesOptimizer<T, P> {
pub fn new(
rules: Vec<Arc<RuleWrapper<T, Self>>>,
cost: Box<dyn CostModel<T>>,
cost: Box<dyn CostModel<T,P>>,
property_builders: Vec<Box<dyn PropertyBuilderAny<T>>>,
physical_property_builders: Arc<P>,
required_root_props: P::PhysicalProps,
) -> Self {
Self::new_with_prop(rules, cost, property_builders, Default::default())
Self::new_with_prop(rules, cost, property_builders, physical_property_builders, required_root_props, Default::default())
}

pub fn new_with_prop(
rules: Vec<Arc<RuleWrapper<T, Self>>>,
cost: Box<dyn CostModel<T>>,
cost: Box<dyn CostModel<T,P>>,
property_builders: Vec<Box<dyn PropertyBuilderAny<T>>>,
physical_property_builders: Arc<P>,
required_root_props: P::PhysicalProps,
prop: OptimizerProperties,
) -> Self {
let tasks = VecDeque::new();
let property_builders: Arc<[_]> = property_builders.into();
let memo = Memo::new(property_builders.clone());
let memo = Memo::new(property_builders.clone(), required_root_props.clone(), physical_property_builders.clone());
Self {
memo,
tasks,
Expand All @@ -104,12 +117,14 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
cost: cost.into(),
ctx: OptimizerContext::default(),
property_builders,
physical_property_builders,
required_root_props,
prop,
disabled_rules: HashSet::new(),
}
}

pub fn cost(&self) -> Arc<dyn CostModel<T>> {
pub fn cost(&self) -> Arc<dyn CostModel<T, P>> {
self.cost.clone()
}

Expand All @@ -129,30 +144,37 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
self.disabled_rules.contains(&rule_id)
}

pub fn dump(&self, group_id: Option<GroupId>) {
pub fn dump(&self, group_id: Option<GroupId>, sub_group_id: Option<SubGroupId>) {
if let Some(group_id) = group_id {
fn dump_inner<T: RelNodeTyp>(this: &CascadesOptimizer<T>, group_id: GroupId) {
if let Some(ref winner) = this.memo.get_group_info(group_id).winner {
fn dump_inner<T: RelNodeTyp, P: PhysicalPropsBuilder<T>>(this: &CascadesOptimizer<T, P>, group_id: GroupId, sub_group_id: Option<SubGroupId>) {
let mut real_sub_group_id = SubGroupId(0);
if sub_group_id.is_some(){
real_sub_group_id = sub_group_id.unwrap();
}
if let Some(ref winner) = this.memo.get_sub_group_info_by_id(group_id, real_sub_group_id).winner {
let expr = this.memo.get_expr_memoed(winner.expr_id);
assert!(!winner.impossible);
if winner.cost.0[1] == 1.0 {
return;
}
println!(
"group_id={} winner={} cost={} {}",
"group_id={} sub_group_id={} winner={} cost={} {}",
group_id,
real_sub_group_id,
winner.expr_id,
this.cost.explain(&winner.cost),
expr
);
for child in &expr.children {
dump_inner(this, *child);
dump_inner(this, child.0, Some(child.1));
}
}
}
dump_inner(self, group_id);
dump_inner(self, group_id, sub_group_id);
return;
}
//TODO(avery): current implementation is that only dump default subgroup when no
// group id is passed. We might add dump for all subgroups in the future.
for group_id in self.memo.get_all_group_ids() {
let winner = if let Some(ref winner) = self.memo.get_group_info(group_id).winner {
if winner.impossible {
Expand Down Expand Up @@ -192,7 +214,7 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {

/// Clear the memo table and all optimizer states.
pub fn step_clear(&mut self) {
self.memo = Memo::new(self.property_builders.clone());
self.memo = Memo::new(self.property_builders.clone(), self.required_root_props.clone(), self.physical_property_builders.clone());
self.fired_rules.clear();
self.explored_group.clear();
}
Expand All @@ -204,23 +226,24 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {

/// Optimize a `RelNode`.
pub fn step_optimize_rel(&mut self, root_rel: RelNodeRef<T>) -> Result<GroupId> {
let (group_id, _) = self.add_group_expr(root_rel, None);
self.fire_optimize_tasks(group_id)?;
let (group_id, _) = self.add_group_expr_to_default_sub_group(root_rel, None);
self.fire_optimize_tasks(group_id, self.physical_property_builders.clone(), self.required_root_props.clone())?;
Ok(group_id)
}

/// Gets the group binding.
pub fn step_get_optimize_rel(
&self,
group_id: GroupId,
physical_props: P::PhysicalProps,
meta: &mut Option<RelNodeMetaMap>,
) -> Result<RelNodeRef<T>> {
self.memo.get_best_group_binding(group_id, meta)
self.memo.get_best_group_binding(group_id, physical_props, meta)
}

fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> {
fn fire_optimize_tasks(&mut self, group_id: GroupId, physical_property_builders: Arc<P>, required_root_props: P::PhysicalProps) -> Result<()> {
self.tasks
.push_back(Box::new(OptimizeGroupTask::new(group_id)));
.push_back(Box::new(OptimizeGroupTask::new(group_id, self.physical_property_builders.clone(), self.required_root_props.clone())));
// get the task from the stack
self.ctx.budget_used = false;
let plan_space_begin = self.memo.compute_plan_space();
Expand Down Expand Up @@ -254,9 +277,9 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
}

fn optimize_inner(&mut self, root_rel: RelNodeRef<T>) -> Result<RelNodeRef<T>> {
let (group_id, _) = self.add_group_expr(root_rel, None);
self.fire_optimize_tasks(group_id)?;
self.memo.get_best_group_binding(group_id, &mut None)
let (group_id, _) = self.add_group_expr_to_default_sub_group(root_rel, None);
self.fire_optimize_tasks(group_id, self.physical_property_builders.clone(), self.required_root_props.clone())?;
self.memo.get_best_group_binding(group_id, self.required_root_props.clone(), &mut None)
}

pub fn resolve_group_id(&self, root_rel: RelNodeRef<T>) -> GroupId {
Expand All @@ -275,12 +298,12 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
self.memo.get_expr_info(expr)
}

pub(super) fn add_group_expr(
pub(super) fn add_group_expr_to_default_sub_group(
&mut self,
expr: RelNodeRef<T>,
group_id: Option<GroupId>,
) -> (GroupId, ExprId) {
self.memo.add_new_group_expr(expr, group_id)
self.memo.add_new_group_expr_to_default_sub_group(expr, group_id)
}

pub(super) fn replace_group_expr(
Expand All @@ -305,29 +328,80 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
});
}

pub(super) fn get_group_info(&self, group_id: GroupId) -> GroupInfo {
pub(super) fn get_group_info(&self, group_id: GroupId) -> SubGroupInfo {
self.memo.get_group_info(group_id)
}

pub(super) fn update_group_info(&mut self, group_id: GroupId, group_info: GroupInfo) {
pub(super) fn update_group_info(&mut self, group_id: GroupId, group_info: SubGroupInfo) {
self.memo.update_group_info(group_id, group_info)
}

pub(super) fn merge_group(&mut self, group_a: GroupId, group_b: GroupId) {
self.memo.merge_group(group_a, group_b);
}

pub(super) fn get_sub_group_info_by_props(
&self,
group_id: GroupId,
physical_props: &P::PhysicalProps,
) -> Option<SubGroupInfo> {
self.memo.get_sub_group_info_by_props(group_id, physical_props)
}

pub(super) fn get_sub_group_info_by_id(
&self,
group_id: GroupId,
sub_group_id: SubGroupId,
) -> SubGroupInfo {
self.memo.get_sub_group_info_by_id(group_id, sub_group_id)
}

pub(super) fn get_sub_group_id(
&self,
group_id: GroupId,
physical_props: &P::PhysicalProps,
) -> Option<SubGroupId> {
self.memo.get_sub_group_id(group_id, physical_props)
}

pub(super) fn add_sub_group_expr(
&mut self,
expr: RelMemoNode<T>,
group_id: GroupId,
physical_props: &P::PhysicalProps,
) -> ExprId {
self.memo.add_sub_group_expr(expr, group_id, physical_props)
}

pub(super) fn update_sub_group_info(
&mut self,
group_id: GroupId,
expr_id: Option<ExprId>,
sub_group_info: SubGroupInfo,
physical_props: &P::PhysicalProps,
) {
self.memo.update_sub_group_info(group_id, expr_id, sub_group_info, physical_props)
}

pub(super) fn update_expr_children_sub_group_id(
&mut self,
expr_id: ExprId,
required_props: Vec<P::PhysicalProps>,
) -> ExprId {
self.memo.update_expr_children_sub_group_id(expr_id, required_props)
}

/// Get the properties of a Cascades group
/// P is the type of the property you expect
/// idx is the idx of the property you want. The order of properties is defined
/// by the property_builders parameter in CascadesOptimizer::new()
pub fn get_property_by_group<P: PropertyBuilder<T>>(
pub fn get_property_by_group<PB: PropertyBuilder<T>>(
&self,
group_id: GroupId,
idx: usize,
) -> P::Prop {
) -> PB::Prop {
self.memo.get_group(group_id).properties[idx]
.downcast_ref::<P::Prop>()
.downcast_ref::<PB::Prop>()
.unwrap()
.clone()
}
Expand All @@ -352,10 +426,11 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
pub fn get_all_group_bindings(
&self,
group_id: GroupId,
sub_group_id: SubGroupId,
physical_only: bool,
) -> Vec<RelNodeRef<T>> {
self.memo
.get_all_group_bindings(group_id, physical_only, true, Some(10))
.get_all_group_bindings(group_id, sub_group_id, physical_only, true, Some(10))
}

pub(super) fn is_group_explored(&self, group_id: GroupId) -> bool {
Expand Down Expand Up @@ -390,12 +465,12 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
}
}

impl<T: RelNodeTyp> Optimizer<T> for CascadesOptimizer<T> {
impl<T: RelNodeTyp, P: PhysicalPropsBuilder<T>> Optimizer<T> for CascadesOptimizer<T, P> {
fn optimize(&mut self, root_rel: RelNodeRef<T>) -> Result<RelNodeRef<T>> {
self.optimize_inner(root_rel)
}

fn get_property<P: PropertyBuilder<T>>(&self, root_rel: RelNodeRef<T>, idx: usize) -> P::Prop {
self.get_property_by_group::<P>(self.resolve_group_id(root_rel), idx)
fn get_property<PB: PropertyBuilder<T>>(&self, root_rel: RelNodeRef<T>, idx: usize) -> PB::Prop {
self.get_property_by_group::<PB>(self.resolve_group_id(root_rel), idx)
}
}
6 changes: 3 additions & 3 deletions optd-core/src/cascades/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;

use crate::rel_node::RelNodeTyp;
use crate::{physical_prop::PhysicalPropsBuilder, rel_node::RelNodeTyp};

use super::CascadesOptimizer;

Expand All @@ -16,8 +16,8 @@ pub use optimize_expression::OptimizeExpressionTask;
pub use optimize_group::OptimizeGroupTask;
pub use optimize_inputs::OptimizeInputsTask;

pub trait Task<T: RelNodeTyp>: 'static + Send + Sync {
fn execute(&self, optimizer: &mut CascadesOptimizer<T>) -> Result<Vec<Box<dyn Task<T>>>>;
pub trait Task<T: RelNodeTyp, P: PhysicalPropsBuilder<T>>: 'static + Send + Sync {
fn execute(&self, optimizer: &mut CascadesOptimizer<T, P>) -> Result<Vec<Box<dyn Task<T, P>>>>;
fn as_any(&self) -> &dyn std::any::Any;
fn describe(&self) -> String;
}
Loading