Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Subquery Unnesting: Exists + In Support #259

Merged
merged 57 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
99ed87e
Add all tpch queries (from risinglightdb tests)
jurplel Dec 6, 2024
5e4f33a
Newline normalization
jurplel Dec 6, 2024
298ab67
Fix BinOp schema property issue
jurplel Dec 6, 2024
6547052
tpch q11 fix
jurplel Dec 6, 2024
b62ce72
Merge branch 'bowad/tpch-q11-fix' of github.com:cmu-db/optd into bowa…
jurplel Dec 6, 2024
768982d
Update sqlplannertest plans
jurplel Dec 6, 2024
52359fa
Merge branch 'main' into bowad/tpch-q11-fix
jurplel Dec 6, 2024
01774d1
Delete q11 again
jurplel Dec 6, 2024
4c2c7fd
fix a couple of depjoin agg pushdown bugs
jurplel Dec 6, 2024
66a310d
un-disable tpch-q17
jurplel Dec 6, 2024
21d490a
Fix another bug w/ init distinct
jurplel Dec 6, 2024
f4108b3
Write comment for init distinct fix
jurplel Dec 6, 2024
a945abf
Merge
jurplel Dec 6, 2024
f055605
Update sqlplannertest plans
jurplel Dec 6, 2024
0dcce3d
Add test for out-of-order extern columns in subquery
jurplel Dec 6, 2024
b468692
add unnest test w/ nulls from agg
jurplel Dec 6, 2024
78c2b5e
Implement outer join agg null fix
jurplel Dec 6, 2024
9edf1af
Count(*) fix
jurplel Dec 7, 2024
bab134f
Merge branch 'main' into bowad/unnest-agg-null-fix
jurplel Dec 7, 2024
b0b9e4e
planner test updates
jurplel Dec 7, 2024
9778e25
clippy
jurplel Dec 7, 2024
87c9c09
Unused variable
jurplel Dec 7, 2024
59ee25b
Initial correlated EXISTS support
jurplel Dec 7, 2024
712f320
Avoid self join issue in adv cost model
jurplel Dec 7, 2024
6310156
Merge branch 'bowad/unnest-agg-null-fix' into bowad/unnest-exists
jurplel Dec 7, 2024
5e283ce
Q4 working
jurplel Dec 7, 2024
ee74f94
Q4
jurplel Dec 7, 2024
c37a3e5
Update sqlplannertest plans
jurplel Dec 7, 2024
2ef7682
Support for NOT EXISTS + simplify approach somewhat...not sure this w…
jurplel Dec 7, 2024
50ac700
Better simulate mark join by using left outer join + more complex test
jurplel Dec 7, 2024
b101fd7
Make it more complicated & more correct + schema modifications
jurplel Dec 7, 2024
c2c908e
Fix NULL not printing
jurplel Dec 7, 2024
0d33ef3
Add in tests
jurplel Dec 7, 2024
951fbd2
Merge branch 'bowad/unnest-agg-null-fix' into bowad/unnest-exists
jurplel Dec 7, 2024
b9c70ee
merge w/ main (and tests are failing)
jurplel Dec 7, 2024
0fc8cb9
Fix not passing all columns through (I think this was a bug?)
jurplel Dec 7, 2024
7d84912
Merge branch 'bowad/unnest-agg-null-fix' into bowad/unnest-exists
jurplel Dec 7, 2024
f94650a
Update planner tests
jurplel Dec 7, 2024
1f0a7e7
Q4 is working
jurplel Dec 7, 2024
61bbb91
Merge
jurplel Dec 8, 2024
d5a6a47
mark join support
jurplel Dec 8, 2024
5f87473
Implement exists w/ mark join
jurplel Dec 8, 2024
fa3e99c
maybe more correct IN?
jurplel Dec 9, 2024
d753945
Handle correlated IN (hopefully) properly, now
jurplel Dec 9, 2024
f9aaccb
Assign TODO to myself
jurplel Dec 9, 2024
20bd190
update in exists to be correlated
jurplel Dec 9, 2024
41867f3
Uncorrelated IN
jurplel Dec 9, 2024
3a93414
Q16 working
jurplel Dec 9, 2024
8876883
Unnesting of correlated EXISTS clauses
jurplel Dec 9, 2024
83efff6
Update planner tests
jurplel Dec 9, 2024
9b9152c
Cleanup
jurplel Dec 9, 2024
3f03aca
Clippy
jurplel Dec 9, 2024
beec1ea
Change to dbg assert
jurplel Dec 9, 2024
fa7e41e
One more clippy warning...
jurplel Dec 9, 2024
34e70f0
Fix assertion bug for Q20 and Q22
jurplel Dec 9, 2024
b741c83
Add new queries to sqlplannertest + delete extraneous file
jurplel Dec 10, 2024
bcc28c9
Merge branch 'main' into bowad/unnest-exists
jurplel Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions optd-datafusion-bridge/src/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ impl OptdPlanContext<'_> {
Some(else_expr),
)?)
}
FuncType::Not => {
let expr = args[0].clone();
Ok(physical_expr::expressions::not(expr)?)
}
FuncType::IsNull => {
let expr = args[0].clone();
Ok(physical_expr::expressions::is_null(expr)?)
}
FuncType::IsNotNull => {
let expr = args[0].clone();
Ok(physical_expr::expressions::is_not_null(expr)?)
}
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -464,14 +476,21 @@ impl OptdPlanContext<'_> {
let physical_expr =
self.conv_from_optd_expr(node.cond(), &Arc::new(filter_schema.clone()))?;

if node.join_type() == JoinType::Cross {
if *node.join_type() == JoinType::Cross {
return Ok(Arc::new(CrossJoinExec::new(left_exec, right_exec))
as Arc<dyn ExecutionPlan + 'static>);
}

let join_type = match node.join_type() {
JoinType::Inner => datafusion::logical_expr::JoinType::Inner,
JoinType::LeftOuter => datafusion::logical_expr::JoinType::Left,
JoinType::Inner => datafusion_expr::JoinType::Inner,
JoinType::FullOuter => datafusion_expr::JoinType::Full,
JoinType::LeftOuter => datafusion_expr::JoinType::Left,
JoinType::RightOuter => datafusion_expr::JoinType::Right,
JoinType::LeftSemi => datafusion_expr::JoinType::LeftSemi,
JoinType::RightSemi => datafusion_expr::JoinType::RightSemi,
JoinType::LeftAnti => datafusion_expr::JoinType::LeftAnti,
JoinType::RightAnti => datafusion_expr::JoinType::RightAnti,
JoinType::LeftMark => datafusion_expr::JoinType::LeftMark,
_ => unimplemented!(),
};

Expand Down
89 changes: 75 additions & 14 deletions optd-datafusion-bridge/src/into_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use std::sync::Arc;

use anyhow::{bail, Result};
use datafusion::common::DFSchema;
use datafusion::logical_expr::{self, logical_plan, LogicalPlan, Operator};
Expand All @@ -15,7 +17,7 @@ use optd_datafusion_repr::plan_nodes::{
ConstantPred, DfReprPlanNode, DfReprPredNode, ExternColumnRefPred, FuncPred, FuncType,
InListPred, JoinType, LikePred, ListPred, LogOpPred, LogOpType, LogicalAgg,
LogicalEmptyRelation, LogicalFilter, LogicalJoin, LogicalLimit, LogicalProjection, LogicalScan,
LogicalSort, RawDependentJoin, SortOrderPred, SortOrderType,
LogicalSort, RawDependentJoin, SortOrderPred, SortOrderType, SubqueryType,
};
use optd_datafusion_repr::properties::schema::Schema as OptdSchema;

Expand All @@ -24,15 +26,18 @@ use crate::OptdPlanContext;
impl OptdPlanContext<'_> {
fn subqueries_to_dependent_joins(
&mut self,
subqueries: &[&Subquery],
subqueries: Vec<(&Subquery, SubqueryType)>,
input: ArcDfPlanNode,
input_schema: &DFSchema,
) -> Result<ArcDfPlanNode> {
let mut node = input;
for Subquery {
subquery,
outer_ref_columns,
} in subqueries.iter()
for (
Subquery {
subquery,
outer_ref_columns,
},
sq_typ,
) in subqueries.into_iter()
{
let subquery_root = self.conv_into_optd_plan_node(subquery, Some(input_schema))?;
let dep_join = RawDependentJoin::new(
Expand All @@ -56,7 +61,7 @@ impl OptdPlanContext<'_> {
})
.collect(),
),
JoinType::Cross,
sq_typ,
);
node = dep_join.into_plan_node();
}
Expand Down Expand Up @@ -92,7 +97,7 @@ impl OptdPlanContext<'_> {
expr: &'a logical_expr::Expr,
context: &DFSchema,
dep_ctx: Option<&DFSchema>,
subqueries: &mut Vec<&'a Subquery>,
subqueries: &mut Vec<(&'a Subquery, SubqueryType)>,
) -> Result<ArcDfPredNode> {
use logical_expr::Expr;
match expr {
Expand Down Expand Up @@ -257,6 +262,18 @@ impl OptdPlanContext<'_> {
)
.into_pred_node())
}
Expr::Not(x) => {
let expr = self.conv_into_optd_expr(x.as_ref(), context, dep_ctx, subqueries)?;
Ok(FuncPred::new(FuncType::Not, ListPred::new(vec![expr])).into_pred_node())
}
Expr::IsNull(x) => {
let expr = self.conv_into_optd_expr(x.as_ref(), context, dep_ctx, subqueries)?;
Ok(FuncPred::new(FuncType::IsNull, ListPred::new(vec![expr])).into_pred_node())
}
Expr::IsNotNull(x) => {
let expr = self.conv_into_optd_expr(x.as_ref(), context, dep_ctx, subqueries)?;
Ok(FuncPred::new(FuncType::IsNotNull, ListPred::new(vec![expr])).into_pred_node())
}
Expr::Between(x) => {
let expr =
self.conv_into_optd_expr(x.expr.as_ref(), context, dep_ctx, subqueries)?;
Expand Down Expand Up @@ -288,9 +305,53 @@ impl OptdPlanContext<'_> {
// This relies on a left-deep tree of dependent joins being
// generated below this node, in response to all pushed subqueries.
let new_column_ref_idx = context.fields().len() + subqueries.len();
subqueries.push(sq);
subqueries.push((sq, SubqueryType::Scalar));
Ok(ColumnRefPred::new(new_column_ref_idx).into_pred_node())
}
Expr::Exists(ex) => {
let sq = &ex.subquery;
let negated = ex.negated;

let new_column_ref_idx = context.fields().len() + subqueries.len();
subqueries.push((sq, SubqueryType::Exists));
if negated {
Ok(FuncPred::new(
FuncType::Not,
ListPred::new(
vec![ColumnRefPred::new(new_column_ref_idx).into_pred_node()],
),
)
.into_pred_node())
} else {
Ok(ColumnRefPred::new(new_column_ref_idx).into_pred_node())
}
}
Expr::InSubquery(insq) => {
let sq = &insq.subquery;
let expr =
self.conv_into_optd_expr(insq.expr.as_ref(), context, dep_ctx, subqueries)?;
let negated = insq.negated;

let new_column_ref_idx = context.fields().len() + subqueries.len();
subqueries.push((
sq,
SubqueryType::Any {
pred: Arc::unwrap_or_clone(expr),
op: BinOpType::Eq,
},
));
if negated {
Ok(FuncPred::new(
FuncType::Not,
ListPred::new(
vec![ColumnRefPred::new(new_column_ref_idx).into_pred_node()],
),
)
.into_pred_node())
} else {
Ok(ColumnRefPred::new(new_column_ref_idx).into_pred_node())
}
}
_ => bail!("Unsupported expression: {:?}", expr),
}
}
Expand All @@ -308,7 +369,7 @@ impl OptdPlanContext<'_> {
dep_ctx,
&mut subqueries,
)?;
let input = self.subqueries_to_dependent_joins(&subqueries, input, node.input.schema())?;
let input = self.subqueries_to_dependent_joins(subqueries, input, node.input.schema())?;
Ok(LogicalProjection::new(input, expr_list))
}

Expand All @@ -326,7 +387,7 @@ impl OptdPlanContext<'_> {
&mut subqueries,
)?;

let input = self.subqueries_to_dependent_joins(&subqueries, input, node.input.schema())?;
let input = self.subqueries_to_dependent_joins(subqueries, input, node.input.schema())?;

Ok(LogicalFilter::new(input, expr))
}
Expand All @@ -336,7 +397,7 @@ impl OptdPlanContext<'_> {
exprs: &'a [logical_expr::Expr],
context: &DFSchema,
dep_ctx: Option<&DFSchema>,
subqueries: &mut Vec<&'a Subquery>,
subqueries: &mut Vec<(&'a Subquery, SubqueryType)>,
) -> Result<ListPred> {
let exprs = exprs
.iter()
Expand All @@ -350,7 +411,7 @@ impl OptdPlanContext<'_> {
exprs: &'a [logical_expr::SortExpr],
context: &DFSchema,
dep_ctx: Option<&DFSchema>,
subqueries: &mut Vec<&'a Subquery>,
subqueries: &mut Vec<(&'a Subquery, SubqueryType)>,
) -> Result<ListPred> {
let exprs = exprs
.iter()
Expand Down Expand Up @@ -453,7 +514,7 @@ impl OptdPlanContext<'_> {
DFJoinType::RightAnti => JoinType::RightAnti,
DFJoinType::LeftSemi => JoinType::LeftSemi,
DFJoinType::RightSemi => JoinType::RightSemi,
_ => unimplemented!(),
DFJoinType::LeftMark => JoinType::LeftMark,
};
let mut log_ops = Vec::with_capacity(node.on.len());
let mut subqueries = vec![];
Expand Down
10 changes: 8 additions & 2 deletions optd-datafusion-repr-adv-cost/src/adv_stats/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ impl<
) -> f64 {
match &expr_tree.typ {
DfPredType::Constant(_) => Self::get_constant_selectivity(expr_tree),
DfPredType::ColumnRef => unimplemented!("check bool type or else panic"),
DfPredType::ColumnRef => {
// TODO: Check that field is of bool type
0.5 // TODO: placeholder---how can we get the selectivity?
Comment on lines +70 to +71
Copy link
Member Author

@jurplel jurplel Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be addressed

}
DfPredType::UnOp(un_op_typ) => {
assert!(expr_tree.children.len() == 1);
let child = expr_tree.child(0);
Expand Down Expand Up @@ -104,7 +107,10 @@ impl<
DfPredType::LogOp(log_op_typ) => {
self.get_log_op_selectivity(*log_op_typ, &expr_tree.children, schema, column_refs)
}
DfPredType::Func(_) => unimplemented!("check bool type or else panic"),
DfPredType::Func(_) => {
// TODO: Check that field is of bool type
0.5 // TODO: placeholder---how can we get the selectivity?
Comment on lines +111 to +112
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

}
DfPredType::SortOrder(_) => {
panic!("the selectivity of sort order expressions is undefined")
}
Expand Down
8 changes: 7 additions & 1 deletion optd-datafusion-repr-adv-cost/src/adv_stats/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ impl<
);
join_filter_selectivity
}
// TODO: Does this make sense?
Copy link
Member Author

@jurplel jurplel Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

JoinType::LeftMark => f64::max(inner_join_selectivity, 1.0 / right_row_cnt),
_ => unimplemented!("join_typ={} is not implemented", join_typ),
}
}
Expand Down Expand Up @@ -359,7 +361,11 @@ impl<
&self,
base_col_refs: HashSet<BaseTableColumnRef>,
) -> f64 {
assert!(base_col_refs.len() > 1);
// Hack to avoid issue w/ self joins...unsure if this is a good idea
if base_col_refs.len() <= 1 {
return 1.0;
}

let num_base_col_refs = base_col_refs.len();
base_col_refs
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn explain_plan_node(
DfNodeType::RawDepJoin(_) => RawDependentJoin::from_plan_node(node)
.unwrap()
.explain(meta_map),
DfNodeType::DepJoin(_) => DependentJoin::from_plan_node(node)
DfNodeType::DepJoin => DependentJoin::from_plan_node(node)
.unwrap()
.explain(meta_map),
DfNodeType::Scan => LogicalScan::from_plan_node(node).unwrap().explain(meta_map),
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-repr/src/memo_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn enumerate_join_order_expr_inner<M: Memo<DfNodeType> + ?Sized>(
visited: &mut HashMap<GroupId, Vec<LogicalJoinOrder>>,
) -> Vec<LogicalJoinOrder> {
let expr = memo.get_expr_memoed(current);
match expr.typ {
match &expr.typ {
DfNodeType::Scan => {
let table = memo.get_pred(expr.predicates[0]); // TODO: use unified repr
let table = ConstantPred::from_pred_node(table)
Expand All @@ -50,7 +50,7 @@ fn enumerate_join_order_expr_inner<M: Memo<DfNodeType> + ?Sized>(
.as_str();
vec![LogicalJoinOrder::Table(table)]
}
DfNodeType::Join(_) | DfNodeType::DepJoin(_) | DfNodeType::RawDepJoin(_) => {
DfNodeType::Join(_) | DfNodeType::DepJoin | DfNodeType::RawDepJoin(_) => {
// Assume child 0 == left, child 1 == right
let left = expr.children[0];
let right = expr.children[1];
Expand Down
8 changes: 4 additions & 4 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use pretty_xmlish::{Pretty, PrettyConfig};
pub use projection::{LogicalProjection, PhysicalProjection};
pub use scan::{LogicalScan, PhysicalScan};
pub use sort::{LogicalSort, PhysicalSort};
pub use subquery::{DependentJoin, RawDependentJoin}; // Add missing import
pub use subquery::{DependentJoin, RawDependentJoin, SubqueryType};

use crate::explain::{explain_plan_node, explain_pred_node};

Expand Down Expand Up @@ -69,16 +69,16 @@ impl std::fmt::Display for DfPredType {

/// DfNodeType FAQ:
/// - The define_plan_node!() macro defines what the children of each join node are
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DfNodeType {
// Developers: update `is_logical` function after adding new plan nodes
// Plan nodes
Projection,
Filter,
Scan,
Join(JoinType),
RawDepJoin(JoinType),
DepJoin(JoinType),
RawDepJoin(SubqueryType),
DepJoin,
Sort,
Agg,
EmptyRelation,
Expand Down
1 change: 1 addition & 0 deletions optd-datafusion-repr/src/plan_nodes/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum JoinType {
RightSemi,
LeftAnti,
RightAnti,
LeftMark,
}

impl Display for JoinType {
Expand Down
8 changes: 4 additions & 4 deletions optd-datafusion-repr/src/plan_nodes/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ macro_rules! define_plan_node {

fn from_plan_node(plan_node: ArcDfPlanNode) -> Option<Self> {
#[allow(unused_variables)]
if let DfNodeType :: $variant $( ($inner_name) )? = plan_node.typ {
if let DfNodeType :: $variant $( ($inner_name) )? = &plan_node.typ {
Some(Self(plan_node))
} else {
None
Expand Down Expand Up @@ -105,9 +105,9 @@ macro_rules! define_plan_node {
)*

$(
pub fn $inner_name(&self) -> JoinType {
if let DfNodeType :: $variant ($inner_name) = self.0 .typ {
return $inner_name;
pub fn $inner_name(&self) -> &$inner_typ {
if let DfNodeType :: $variant ($inner_name) = &self.0.typ {
return &$inner_name;
} else {
unreachable!();
}
Expand Down
3 changes: 3 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/predicates/func_pred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum FuncType {
Scalar(String, DataType),
Agg(String),
Case,
Not,
IsNull,
IsNotNull,
}

impl std::fmt::Display for FuncType {
Expand Down
Loading
Loading