Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
Subquery Unnesting: Exists + In Support (#259)
Browse files Browse the repository at this point in the history
- Support uncorrelated/correlated IN (ANY) and EXISTS clauses (This
should effectively make subquery unnesting feature complete!)
- TPC-H Q4, Q16, Q20, and Q22 working
- The remaining queries mostly seem to have plans that are too bad (Q18
has some other issue, [fix in the
pipeline](https://github.com/cmu-db/optd/pull/261) but it still seems
like the plan is too slow).
  • Loading branch information
jurplel authored Dec 10, 2024
1 parent cca20d4 commit 271d288
Show file tree
Hide file tree
Showing 42 changed files with 1,306 additions and 183 deletions.
25 changes: 22 additions & 3 deletions optd-datafusion-bridge/src/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ impl OptdPlanContext<'_> {
Some(else_expr),
)?)
}
FuncType::Not => {
let expr = args[0].clone();
Ok(physical_expr::expressions::not(expr)?)
}
FuncType::IsNull => {
let expr = args[0].clone();
Ok(physical_expr::expressions::is_null(expr)?)
}
FuncType::IsNotNull => {
let expr = args[0].clone();
Ok(physical_expr::expressions::is_not_null(expr)?)
}
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -464,14 +476,21 @@ impl OptdPlanContext<'_> {
let physical_expr =
self.conv_from_optd_expr(node.cond(), &Arc::new(filter_schema.clone()))?;

if node.join_type() == JoinType::Cross {
if *node.join_type() == JoinType::Cross {
return Ok(Arc::new(CrossJoinExec::new(left_exec, right_exec))
as Arc<dyn ExecutionPlan + 'static>);
}

let join_type = match node.join_type() {
JoinType::Inner => datafusion::logical_expr::JoinType::Inner,
JoinType::LeftOuter => datafusion::logical_expr::JoinType::Left,
JoinType::Inner => datafusion_expr::JoinType::Inner,
JoinType::FullOuter => datafusion_expr::JoinType::Full,
JoinType::LeftOuter => datafusion_expr::JoinType::Left,
JoinType::RightOuter => datafusion_expr::JoinType::Right,
JoinType::LeftSemi => datafusion_expr::JoinType::LeftSemi,
JoinType::RightSemi => datafusion_expr::JoinType::RightSemi,
JoinType::LeftAnti => datafusion_expr::JoinType::LeftAnti,
JoinType::RightAnti => datafusion_expr::JoinType::RightAnti,
JoinType::LeftMark => datafusion_expr::JoinType::LeftMark,
_ => unimplemented!(),
};

Expand Down
89 changes: 75 additions & 14 deletions optd-datafusion-bridge/src/into_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use std::sync::Arc;

use anyhow::{bail, Result};
use datafusion::common::DFSchema;
use datafusion::logical_expr::{self, logical_plan, LogicalPlan, Operator};
Expand All @@ -15,7 +17,7 @@ use optd_datafusion_repr::plan_nodes::{
ConstantPred, DfReprPlanNode, DfReprPredNode, ExternColumnRefPred, FuncPred, FuncType,
InListPred, JoinType, LikePred, ListPred, LogOpPred, LogOpType, LogicalAgg,
LogicalEmptyRelation, LogicalFilter, LogicalJoin, LogicalLimit, LogicalProjection, LogicalScan,
LogicalSort, RawDependentJoin, SortOrderPred, SortOrderType,
LogicalSort, RawDependentJoin, SortOrderPred, SortOrderType, SubqueryType,
};
use optd_datafusion_repr::properties::schema::Schema as OptdSchema;

Expand All @@ -24,15 +26,18 @@ use crate::OptdPlanContext;
impl OptdPlanContext<'_> {
fn subqueries_to_dependent_joins(
&mut self,
subqueries: &[&Subquery],
subqueries: Vec<(&Subquery, SubqueryType)>,
input: ArcDfPlanNode,
input_schema: &DFSchema,
) -> Result<ArcDfPlanNode> {
let mut node = input;
for Subquery {
subquery,
outer_ref_columns,
} in subqueries.iter()
for (
Subquery {
subquery,
outer_ref_columns,
},
sq_typ,
) in subqueries.into_iter()
{
let subquery_root = self.conv_into_optd_plan_node(subquery, Some(input_schema))?;
let dep_join = RawDependentJoin::new(
Expand All @@ -56,7 +61,7 @@ impl OptdPlanContext<'_> {
})
.collect(),
),
JoinType::Cross,
sq_typ,
);
node = dep_join.into_plan_node();
}
Expand Down Expand Up @@ -92,7 +97,7 @@ impl OptdPlanContext<'_> {
expr: &'a logical_expr::Expr,
context: &DFSchema,
dep_ctx: Option<&DFSchema>,
subqueries: &mut Vec<&'a Subquery>,
subqueries: &mut Vec<(&'a Subquery, SubqueryType)>,
) -> Result<ArcDfPredNode> {
use logical_expr::Expr;
match expr {
Expand Down Expand Up @@ -257,6 +262,18 @@ impl OptdPlanContext<'_> {
)
.into_pred_node())
}
Expr::Not(x) => {
let expr = self.conv_into_optd_expr(x.as_ref(), context, dep_ctx, subqueries)?;
Ok(FuncPred::new(FuncType::Not, ListPred::new(vec![expr])).into_pred_node())
}
Expr::IsNull(x) => {
let expr = self.conv_into_optd_expr(x.as_ref(), context, dep_ctx, subqueries)?;
Ok(FuncPred::new(FuncType::IsNull, ListPred::new(vec![expr])).into_pred_node())
}
Expr::IsNotNull(x) => {
let expr = self.conv_into_optd_expr(x.as_ref(), context, dep_ctx, subqueries)?;
Ok(FuncPred::new(FuncType::IsNotNull, ListPred::new(vec![expr])).into_pred_node())
}
Expr::Between(x) => {
let expr =
self.conv_into_optd_expr(x.expr.as_ref(), context, dep_ctx, subqueries)?;
Expand Down Expand Up @@ -288,9 +305,53 @@ impl OptdPlanContext<'_> {
// This relies on a left-deep tree of dependent joins being
// generated below this node, in response to all pushed subqueries.
let new_column_ref_idx = context.fields().len() + subqueries.len();
subqueries.push(sq);
subqueries.push((sq, SubqueryType::Scalar));
Ok(ColumnRefPred::new(new_column_ref_idx).into_pred_node())
}
Expr::Exists(ex) => {
let sq = &ex.subquery;
let negated = ex.negated;

let new_column_ref_idx = context.fields().len() + subqueries.len();
subqueries.push((sq, SubqueryType::Exists));
if negated {
Ok(FuncPred::new(
FuncType::Not,
ListPred::new(
vec![ColumnRefPred::new(new_column_ref_idx).into_pred_node()],
),
)
.into_pred_node())
} else {
Ok(ColumnRefPred::new(new_column_ref_idx).into_pred_node())
}
}
Expr::InSubquery(insq) => {
let sq = &insq.subquery;
let expr =
self.conv_into_optd_expr(insq.expr.as_ref(), context, dep_ctx, subqueries)?;
let negated = insq.negated;

let new_column_ref_idx = context.fields().len() + subqueries.len();
subqueries.push((
sq,
SubqueryType::Any {
pred: Arc::unwrap_or_clone(expr),
op: BinOpType::Eq,
},
));
if negated {
Ok(FuncPred::new(
FuncType::Not,
ListPred::new(
vec![ColumnRefPred::new(new_column_ref_idx).into_pred_node()],
),
)
.into_pred_node())
} else {
Ok(ColumnRefPred::new(new_column_ref_idx).into_pred_node())
}
}
_ => bail!("Unsupported expression: {:?}", expr),
}
}
Expand All @@ -308,7 +369,7 @@ impl OptdPlanContext<'_> {
dep_ctx,
&mut subqueries,
)?;
let input = self.subqueries_to_dependent_joins(&subqueries, input, node.input.schema())?;
let input = self.subqueries_to_dependent_joins(subqueries, input, node.input.schema())?;
Ok(LogicalProjection::new(input, expr_list))
}

Expand All @@ -326,7 +387,7 @@ impl OptdPlanContext<'_> {
&mut subqueries,
)?;

let input = self.subqueries_to_dependent_joins(&subqueries, input, node.input.schema())?;
let input = self.subqueries_to_dependent_joins(subqueries, input, node.input.schema())?;

Ok(LogicalFilter::new(input, expr))
}
Expand All @@ -336,7 +397,7 @@ impl OptdPlanContext<'_> {
exprs: &'a [logical_expr::Expr],
context: &DFSchema,
dep_ctx: Option<&DFSchema>,
subqueries: &mut Vec<&'a Subquery>,
subqueries: &mut Vec<(&'a Subquery, SubqueryType)>,
) -> Result<ListPred> {
let exprs = exprs
.iter()
Expand All @@ -350,7 +411,7 @@ impl OptdPlanContext<'_> {
exprs: &'a [logical_expr::SortExpr],
context: &DFSchema,
dep_ctx: Option<&DFSchema>,
subqueries: &mut Vec<&'a Subquery>,
subqueries: &mut Vec<(&'a Subquery, SubqueryType)>,
) -> Result<ListPred> {
let exprs = exprs
.iter()
Expand Down Expand Up @@ -453,7 +514,7 @@ impl OptdPlanContext<'_> {
DFJoinType::RightAnti => JoinType::RightAnti,
DFJoinType::LeftSemi => JoinType::LeftSemi,
DFJoinType::RightSemi => JoinType::RightSemi,
_ => unimplemented!(),
DFJoinType::LeftMark => JoinType::LeftMark,
};
let mut log_ops = Vec::with_capacity(node.on.len());
let mut subqueries = vec![];
Expand Down
10 changes: 8 additions & 2 deletions optd-datafusion-repr-adv-cost/src/adv_stats/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ impl<
) -> f64 {
match &expr_tree.typ {
DfPredType::Constant(_) => Self::get_constant_selectivity(expr_tree),
DfPredType::ColumnRef => unimplemented!("check bool type or else panic"),
DfPredType::ColumnRef => {
// TODO: Check that field is of bool type
0.5 // TODO: placeholder---how can we get the selectivity?
}
DfPredType::UnOp(un_op_typ) => {
assert!(expr_tree.children.len() == 1);
let child = expr_tree.child(0);
Expand Down Expand Up @@ -104,7 +107,10 @@ impl<
DfPredType::LogOp(log_op_typ) => {
self.get_log_op_selectivity(*log_op_typ, &expr_tree.children, schema, column_refs)
}
DfPredType::Func(_) => unimplemented!("check bool type or else panic"),
DfPredType::Func(_) => {
// TODO: Check that field is of bool type
0.5 // TODO: placeholder---how can we get the selectivity?
}
DfPredType::SortOrder(_) => {
panic!("the selectivity of sort order expressions is undefined")
}
Expand Down
8 changes: 7 additions & 1 deletion optd-datafusion-repr-adv-cost/src/adv_stats/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ impl<
);
join_filter_selectivity
}
// TODO: Does this make sense?
JoinType::LeftMark => f64::max(inner_join_selectivity, 1.0 / right_row_cnt),
_ => unimplemented!("join_typ={} is not implemented", join_typ),
}
}
Expand Down Expand Up @@ -359,7 +361,11 @@ impl<
&self,
base_col_refs: HashSet<BaseTableColumnRef>,
) -> f64 {
assert!(base_col_refs.len() > 1);
// Hack to avoid issue w/ self joins...unsure if this is a good idea
if base_col_refs.len() <= 1 {
return 1.0;
}

let num_base_col_refs = base_col_refs.len();
base_col_refs
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn explain_plan_node(
DfNodeType::RawDepJoin(_) => RawDependentJoin::from_plan_node(node)
.unwrap()
.explain(meta_map),
DfNodeType::DepJoin(_) => DependentJoin::from_plan_node(node)
DfNodeType::DepJoin => DependentJoin::from_plan_node(node)
.unwrap()
.explain(meta_map),
DfNodeType::Scan => LogicalScan::from_plan_node(node).unwrap().explain(meta_map),
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-repr/src/memo_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn enumerate_join_order_expr_inner<M: Memo<DfNodeType> + ?Sized>(
visited: &mut HashMap<GroupId, Vec<LogicalJoinOrder>>,
) -> Vec<LogicalJoinOrder> {
let expr = memo.get_expr_memoed(current);
match expr.typ {
match &expr.typ {
DfNodeType::Scan => {
let table = memo.get_pred(expr.predicates[0]); // TODO: use unified repr
let table = ConstantPred::from_pred_node(table)
Expand All @@ -50,7 +50,7 @@ fn enumerate_join_order_expr_inner<M: Memo<DfNodeType> + ?Sized>(
.as_str();
vec![LogicalJoinOrder::Table(table)]
}
DfNodeType::Join(_) | DfNodeType::DepJoin(_) | DfNodeType::RawDepJoin(_) => {
DfNodeType::Join(_) | DfNodeType::DepJoin | DfNodeType::RawDepJoin(_) => {
// Assume child 0 == left, child 1 == right
let left = expr.children[0];
let right = expr.children[1];
Expand Down
8 changes: 4 additions & 4 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use pretty_xmlish::{Pretty, PrettyConfig};
pub use projection::{LogicalProjection, PhysicalProjection};
pub use scan::{LogicalScan, PhysicalScan};
pub use sort::{LogicalSort, PhysicalSort};
pub use subquery::{DependentJoin, RawDependentJoin}; // Add missing import
pub use subquery::{DependentJoin, RawDependentJoin, SubqueryType};

use crate::explain::{explain_plan_node, explain_pred_node};

Expand Down Expand Up @@ -69,16 +69,16 @@ impl std::fmt::Display for DfPredType {

/// DfNodeType FAQ:
/// - The define_plan_node!() macro defines what the children of each join node are
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DfNodeType {
// Developers: update `is_logical` function after adding new plan nodes
// Plan nodes
Projection,
Filter,
Scan,
Join(JoinType),
RawDepJoin(JoinType),
DepJoin(JoinType),
RawDepJoin(SubqueryType),
DepJoin,
Sort,
Agg,
EmptyRelation,
Expand Down
1 change: 1 addition & 0 deletions optd-datafusion-repr/src/plan_nodes/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum JoinType {
RightSemi,
LeftAnti,
RightAnti,
LeftMark,
}

impl Display for JoinType {
Expand Down
8 changes: 4 additions & 4 deletions optd-datafusion-repr/src/plan_nodes/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ macro_rules! define_plan_node {

fn from_plan_node(plan_node: ArcDfPlanNode) -> Option<Self> {
#[allow(unused_variables)]
if let DfNodeType :: $variant $( ($inner_name) )? = plan_node.typ {
if let DfNodeType :: $variant $( ($inner_name) )? = &plan_node.typ {
Some(Self(plan_node))
} else {
None
Expand Down Expand Up @@ -105,9 +105,9 @@ macro_rules! define_plan_node {
)*

$(
pub fn $inner_name(&self) -> JoinType {
if let DfNodeType :: $variant ($inner_name) = self.0 .typ {
return $inner_name;
pub fn $inner_name(&self) -> &$inner_typ {
if let DfNodeType :: $variant ($inner_name) = &self.0.typ {
return &$inner_name;
} else {
unreachable!();
}
Expand Down
3 changes: 3 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/predicates/func_pred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum FuncType {
Scalar(String, DataType),
Agg(String),
Case,
Not,
IsNull,
IsNotNull,
}

impl std::fmt::Display for FuncType {
Expand Down
Loading

0 comments on commit 271d288

Please sign in to comment.