Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 5065c42

Browse files
jurplelskyzh
andauthored
feat: unnesting arbitrary subqueries (likely broken) (#180)
Somewhere between a proof of concept and a draft—work still heavily in progress. Will already successfully parse and fully unnest a subset of correlated and uncorrelated subqueries (although I am uncertain about correctness). **TODO**: - [ ] Formal testing - [ ] EXISTS clauses - [ ] IN clauses - [ ] ANY/ALL clauses - [ ] Correctness issue with COUNT(*) (requires adding left outer join to plan) - [x] Move some/all of this to rewriting stage to support multiple subqueries/ordering operations - [x] “Sideways information passing” (subplans are duplicated now instead of making a DAG) - It seems that a DAG representation is only supported by looking for groups that appear the same. It looks to me that the cloned branches generated by this PR are indeed marked with the same group ID. I marked this bullet point as completed with this in mind. - [ ] Support more pushdowns (e.g. limit, joins) - [ ] Optimizations from the paper are all missing (Out of scope?) --------- Signed-off-by: Alex Chi <[email protected]> Co-authored-by: Alex Chi <[email protected]>
1 parent f8f714c commit 5065c42

File tree

14 files changed

+898
-72
lines changed

14 files changed

+898
-72
lines changed

optd-datafusion-bridge/src/into_optd.rs

Lines changed: 227 additions & 61 deletions
Large diffs are not rendered by default.

optd-datafusion-repr/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ use rules::{
3232

3333
pub use optd_core::rel_node::Value;
3434

35+
use crate::rules::{
36+
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
37+
};
38+
3539
pub mod cost;
3640
mod explain;
3741
pub mod plan_nodes;
@@ -88,6 +92,11 @@ impl DatafusionOptimizer {
8892
Arc::new(EliminateLimitRule::new()),
8993
Arc::new(EliminateDuplicatedSortExprRule::new()),
9094
Arc::new(EliminateDuplicatedAggExprRule::new()),
95+
Arc::new(DepJoinEliminateAtScan::new()),
96+
Arc::new(DepInitialDistinct::new()),
97+
Arc::new(DepJoinPastProj::new()),
98+
Arc::new(DepJoinPastFilter::new()),
99+
Arc::new(DepJoinPastAgg::new()),
91100
Arc::new(ProjectMergeRule::new()),
92101
Arc::new(FilterMergeRule::new()),
93102
]
@@ -159,7 +168,7 @@ impl DatafusionOptimizer {
159168
),
160169
hueristic_optimizer: HeuristicsOptimizer::new_with_rules(
161170
heuristic_rules,
162-
ApplyOrder::BottomUp,
171+
ApplyOrder::TopDown, // uhh TODO reconsider
163172
property_builders.clone(),
164173
),
165174
enable_adaptive,

optd-datafusion-repr/src/plan_nodes.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub(super) mod macros;
1111
mod projection;
1212
mod scan;
1313
mod sort;
14+
mod subquery;
1415

1516
use std::fmt::Debug;
1617
use std::sync::Arc;
@@ -37,6 +38,7 @@ use pretty_xmlish::{Pretty, PrettyConfig};
3738
pub use projection::{LogicalProjection, PhysicalProjection};
3839
pub use scan::{LogicalScan, PhysicalScan};
3940
pub use sort::{LogicalSort, PhysicalSort};
41+
pub use subquery::{DependentJoin, ExternColumnRefExpr, RawDependentJoin}; // Add missing import
4042

4143
use crate::properties::schema::{Schema, SchemaPropertyBuilder};
4244

@@ -52,6 +54,8 @@ pub enum OptRelNodeTyp {
5254
Filter,
5355
Scan,
5456
Join(JoinType),
57+
RawDepJoin(JoinType),
58+
DepJoin(JoinType),
5559
Sort,
5660
Agg,
5761
Apply(ApplyType),
@@ -70,6 +74,7 @@ pub enum OptRelNodeTyp {
7074
// Expressions
7175
Constant(ConstantType),
7276
ColumnRef,
77+
ExternColumnRef,
7378
UnOp(UnOpType),
7479
BinOp(BinOpType),
7580
LogOp(LogOpType),
@@ -90,6 +95,8 @@ impl OptRelNodeTyp {
9095
| Self::Filter
9196
| Self::Scan
9297
| Self::Join(_)
98+
| Self::RawDepJoin(_)
99+
| Self::DepJoin(_)
93100
| Self::Apply(_)
94101
| Self::Sort
95102
| Self::Agg
@@ -112,6 +119,7 @@ impl OptRelNodeTyp {
112119
self,
113120
Self::Constant(_)
114121
| Self::ColumnRef
122+
| Self::ExternColumnRef
115123
| Self::UnOp(_)
116124
| Self::BinOp(_)
117125
| Self::LogOp(_)
@@ -295,7 +303,7 @@ impl Expr {
295303
/// the call stack, and no expression will be returned.
296304
pub fn rewrite_column_refs(
297305
&self,
298-
rewrite_fn: &impl Fn(usize) -> Option<usize>,
306+
rewrite_fn: &mut impl FnMut(usize) -> Option<usize>,
299307
) -> Option<Self> {
300308
assert!(self.typ().is_expression());
301309
if let OptRelNodeTyp::ColumnRef = self.typ() {
@@ -314,8 +322,17 @@ impl Expr {
314322
.into_iter()
315323
.map(|child| {
316324
if child.typ == OptRelNodeTyp::List {
317-
// TODO: What should we do with List?
318-
return Some(child);
325+
return Some(
326+
ExprList::new(
327+
ExprList::from_rel_node(child.clone())
328+
.unwrap()
329+
.to_vec()
330+
.into_iter()
331+
.map(|x| x.rewrite_column_refs(rewrite_fn).unwrap())
332+
.collect(),
333+
)
334+
.into_rel_node(),
335+
);
319336
}
320337
Expr::from_rel_node(child.clone())
321338
.unwrap()
@@ -392,6 +409,9 @@ pub fn explain(rel_node: OptRelNodeRef, meta_map: Option<&RelNodeMetaMap>) -> Pr
392409
OptRelNodeTyp::ColumnRef => ColumnRefExpr::from_rel_node(rel_node)
393410
.unwrap()
394411
.dispatch_explain(meta_map),
412+
OptRelNodeTyp::ExternColumnRef => ExternColumnRefExpr::from_rel_node(rel_node)
413+
.unwrap()
414+
.dispatch_explain(meta_map),
395415
OptRelNodeTyp::Constant(_) => ConstantExpr::from_rel_node(rel_node)
396416
.unwrap()
397417
.dispatch_explain(meta_map),
@@ -407,6 +427,12 @@ pub fn explain(rel_node: OptRelNodeRef, meta_map: Option<&RelNodeMetaMap>) -> Pr
407427
OptRelNodeTyp::Join(_) => LogicalJoin::from_rel_node(rel_node)
408428
.unwrap()
409429
.dispatch_explain(meta_map),
430+
OptRelNodeTyp::RawDepJoin(_) => RawDependentJoin::from_rel_node(rel_node)
431+
.unwrap()
432+
.dispatch_explain(meta_map),
433+
OptRelNodeTyp::DepJoin(_) => DependentJoin::from_rel_node(rel_node)
434+
.unwrap()
435+
.dispatch_explain(meta_map),
410436
OptRelNodeTyp::Scan => LogicalScan::from_rel_node(rel_node)
411437
.unwrap()
412438
.dispatch_explain(meta_map),
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use optd_core::rel_node::{RelNode, RelNodeMetaMap, Value};
2+
use pretty_xmlish::Pretty;
3+
4+
use super::macros::define_plan_node;
5+
use super::{Expr, ExprList, JoinType, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
6+
7+
#[derive(Clone, Debug)]
8+
pub struct RawDependentJoin(pub PlanNode);
9+
10+
define_plan_node!(
11+
RawDependentJoin : PlanNode,
12+
RawDepJoin, [
13+
{ 0, left: PlanNode },
14+
{ 1, right: PlanNode }
15+
], [
16+
{ 2, cond: Expr },
17+
{ 3, extern_cols: ExprList }
18+
], { join_type: JoinType }
19+
);
20+
21+
#[derive(Clone, Debug)]
22+
pub struct DependentJoin(pub PlanNode);
23+
24+
define_plan_node!(
25+
DependentJoin : PlanNode,
26+
DepJoin, [
27+
{ 0, left: PlanNode },
28+
{ 1, right: PlanNode }
29+
], [
30+
{ 2, cond: Expr },
31+
{ 3, extern_cols: ExprList }
32+
], { join_type: JoinType }
33+
);
34+
35+
#[derive(Clone, Debug)]
36+
pub struct ExternColumnRefExpr(pub Expr);
37+
38+
impl ExternColumnRefExpr {
39+
/// Creates a new `DepExternColumnRef` expression.
40+
pub fn new(column_idx: usize) -> ExternColumnRefExpr {
41+
// this conversion is always safe since usize is at most u64
42+
let u64_column_idx = column_idx as u64;
43+
ExternColumnRefExpr(Expr(
44+
RelNode {
45+
typ: OptRelNodeTyp::ExternColumnRef,
46+
children: vec![],
47+
data: Some(Value::UInt64(u64_column_idx)),
48+
}
49+
.into(),
50+
))
51+
}
52+
53+
fn get_data_usize(&self) -> usize {
54+
self.0 .0.data.as_ref().unwrap().as_u64() as usize
55+
}
56+
57+
/// Gets the column index.
58+
pub fn index(&self) -> usize {
59+
self.get_data_usize()
60+
}
61+
}
62+
63+
impl OptRelNode for ExternColumnRefExpr {
64+
fn into_rel_node(self) -> OptRelNodeRef {
65+
self.0.into_rel_node()
66+
}
67+
68+
fn from_rel_node(rel_node: OptRelNodeRef) -> Option<Self> {
69+
if rel_node.typ != OptRelNodeTyp::ExternColumnRef {
70+
return None;
71+
}
72+
Expr::from_rel_node(rel_node).map(Self)
73+
}
74+
75+
fn dispatch_explain(&self, _meta_map: Option<&RelNodeMetaMap>) -> Pretty<'static> {
76+
Pretty::display(&format!("Extern(#{})", self.index()))
77+
}
78+
}

optd-datafusion-repr/src/properties/column_ref.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ impl PropertyBuilder<OptRelNodeTyp> for ColumnRefPropertyBuilder {
362362
GroupColumnRefs::new(column_refs, child.output_correlation.clone())
363363
}
364364
// Should account for all physical join types.
365-
OptRelNodeTyp::Join(join_type) => {
365+
OptRelNodeTyp::Join(join_type) | OptRelNodeTyp::RawDepJoin(join_type) | OptRelNodeTyp::DepJoin(join_type)=> {
366366
// Concatenate left and right children column refs.
367367
let column_refs = Self::concat_children_col_refs(&children[0..2]);
368368
// Merge the equal columns of two children as input correlation.
@@ -465,6 +465,7 @@ impl PropertyBuilder<OptRelNodeTyp> for ColumnRefPropertyBuilder {
465465
GroupColumnRefs::new(column_refs, correlation)
466466
}
467467
OptRelNodeTyp::Constant(_)
468+
| OptRelNodeTyp::ExternColumnRef // TODO Possibly very very wrong---consult cost model team
468469
| OptRelNodeTyp::Func(_)
469470
| OptRelNodeTyp::DataType(_)
470471
| OptRelNodeTyp::Between

optd-datafusion-repr/src/properties/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl PropertyBuilder<OptRelNodeTyp> for SchemaPropertyBuilder {
7373
}
7474
OptRelNodeTyp::Projection => children[1].clone(),
7575
OptRelNodeTyp::Filter => children[0].clone(),
76-
OptRelNodeTyp::Join(_) => {
76+
OptRelNodeTyp::DepJoin(_) | OptRelNodeTyp::Join(_) => {
7777
let mut schema = children[0].clone();
7878
let schema2 = children[1].clone();
7979
schema.fields.extend(schema2.fields);

optd-datafusion-repr/src/rules.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod joins;
77
mod macros;
88
mod physical;
99
mod project_transpose;
10+
mod subquery;
1011

1112
// pub use filter_join::FilterJoinPullUpRule;
1213
pub use eliminate_duplicated_expr::{
@@ -25,3 +26,6 @@ pub use project_transpose::{
2526
project_join_transpose::ProjectionPullUpJoin,
2627
project_merge::ProjectMergeRule,
2728
};
29+
pub use subquery::{
30+
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
31+
};

optd-datafusion-repr/src/rules/filter_pushdown.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
//! At a high level, filter pushdown is responsible for pushing the filter node
99
//! further down the query plan whenever it is possible to do so.
1010
11+
use core::panic;
1112
use std::collections::{HashMap, HashSet};
1213
use std::vec;
1314

@@ -64,6 +65,10 @@ fn determine_join_cond_dep(
6465
left_col = true;
6566
} else if index >= left_schema_size && index < left_schema_size + right_schema_size {
6667
right_col = true;
68+
} else {
69+
panic!(
70+
"Column index {index} out of bounds {left_schema_size} + {right_schema_size}"
71+
);
6772
}
6873
}
6974
}
@@ -238,7 +243,7 @@ fn filter_join_transpose(
238243
match location {
239244
JoinCondDependency::Left => left_conds.push(expr),
240245
JoinCondDependency::Right => right_conds.push(
241-
expr.rewrite_column_refs(&|idx| {
246+
expr.rewrite_column_refs(&mut |idx| {
242247
Some(LogicalJoin::map_through_join(
243248
idx,
244249
left_schema_size,

optd-datafusion-repr/src/rules/joins.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ fn apply_join_commute(
2929
let right_schema = optimizer.get_property::<SchemaPropertyBuilder>(Arc::new(right.clone()), 0);
3030
let cond = Expr::from_rel_node(cond.into())
3131
.unwrap()
32-
.rewrite_column_refs(&|idx| {
32+
.rewrite_column_refs(&mut |idx| {
3333
Some(if idx < left_schema.len() {
3434
idx + right_schema.len()
3535
} else {
@@ -129,7 +129,7 @@ fn apply_join_assoc(
129129

130130
let cond2 = Expr::from_rel_node(cond2.into()).unwrap();
131131

132-
let Some(cond2) = cond2.rewrite_column_refs(&|idx| {
132+
let Some(cond2) = cond2.rewrite_column_refs(&mut |idx| {
133133
if idx < a_schema.len() {
134134
None
135135
} else {

optd-datafusion-repr/src/rules/project_transpose/project_transpose_common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl ProjectionMapping {
5454
/// Join { cond: #1=#4 }
5555
pub fn rewrite_join_cond(&self, cond: Expr, child_schema_len: usize) -> Expr {
5656
let schema_size = self.forward.len();
57-
cond.rewrite_column_refs(&|col_idx| {
57+
cond.rewrite_column_refs(&mut |col_idx| {
5858
if col_idx < schema_size {
5959
self.projection_col_maps_to(col_idx)
6060
} else {
@@ -79,7 +79,7 @@ impl ProjectionMapping {
7979
/// ---->
8080
/// Filter { cond: #1=0 and #4=1 }
8181
pub fn rewrite_filter_cond(&self, cond: Expr, is_added: bool) -> Expr {
82-
cond.rewrite_column_refs(&|col_idx| {
82+
cond.rewrite_column_refs(&mut |col_idx| {
8383
if is_added {
8484
self.original_col_maps_to(col_idx)
8585
} else {

0 commit comments

Comments
 (0)