Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 61b6e49

Browse files
authored
fix(df-repr): depjoin elimination rule (#212)
This is not the most efficient way to implement this, but at least it works. Reimplemented the eliminate depjoin rule, so that it can remove depjoin and insert join when there are no correlated columns. This is done by recursively inspecting whether `ExternCol` exists on the right side of the plan tree. This also means that this eliminate depjoin rule must be used as a heuristics rule. Added a new subquery regression test and enabled TPC-H 1-5 to use the optd logical optimizer. --------- Signed-off-by: Alex Chi <[email protected]>
1 parent 36e28ee commit 61b6e49

File tree

12 files changed

+308
-210
lines changed

12 files changed

+308
-210
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

optd-datafusion-bridge/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ async-recursion = "1"
1616
futures-lite = "2"
1717
futures-util = "0.3"
1818
itertools = "0.11"
19+
tracing = "0.1"

optd-datafusion-bridge/src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ impl OptdQueryPlanner {
132132
}));
133133
}
134134
let mut optd_rel = ctx.conv_into_optd(logical_plan)?;
135+
135136
if let Some(explains) = &mut explains {
136137
explains.push(StringifiedPlan::new(
137138
PlanType::OptimizedLogicalPlan {
@@ -142,9 +143,17 @@ impl OptdQueryPlanner {
142143
.explain_to_string(None),
143144
));
144145
}
146+
147+
tracing::trace!(
148+
optd_unoptimized_plan = %("\n".to_string()
149+
+ &PlanNode::from_rel_node(optd_rel.clone())
150+
.unwrap()
151+
.explain_to_string(None)));
152+
145153
let mut optimizer = self.optimizer.lock().unwrap().take().unwrap();
146154

147155
if optimizer.is_heuristic_enabled() {
156+
// TODO: depjoin pushdown might need to run multiple times
148157
optd_rel = optimizer.heuristic_optimize(optd_rel);
149158
if let Some(explains) = &mut explains {
150159
explains.push(StringifiedPlan::new(
@@ -156,6 +165,11 @@ impl OptdQueryPlanner {
156165
.explain_to_string(None),
157166
))
158167
}
168+
tracing::trace!(
169+
optd_optimized_plan = %("\n".to_string()
170+
+ &PlanNode::from_rel_node(optd_rel.clone())
171+
.unwrap()
172+
.explain_to_string(None)));
159173
}
160174

161175
let (group_id, optimized_rel, meta) = optimizer.cascades_optimize(optd_rel)?;
@@ -180,6 +194,13 @@ impl OptdQueryPlanner {
180194
join_orders.iter().map(|x| x.to_string()).join("\n"),
181195
));
182196
}
197+
198+
tracing::trace!(
199+
optd_physical_plan = %("\n".to_string()
200+
+ &PlanNode::from_rel_node(optimized_rel.clone())
201+
.unwrap()
202+
.explain_to_string(None)));
203+
183204
ctx.optimizer = Some(&optimizer);
184205
let physical_plan = ctx.conv_from_optd(optimized_rel, meta).await?;
185206
if let Some(explains) = &mut explains {

optd-datafusion-repr/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use rules::{
3131
pub use optd_core::rel_node::Value;
3232

3333
use crate::rules::{
34-
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
34+
DepInitialDistinct, DepJoinEliminate, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
3535
};
3636

3737
pub use memo_ext::{LogicalJoinOrder, MemoExt};
@@ -93,7 +93,7 @@ impl DatafusionOptimizer {
9393
Arc::new(EliminateLimitRule::new()),
9494
Arc::new(EliminateDuplicatedSortExprRule::new()),
9595
Arc::new(EliminateDuplicatedAggExprRule::new()),
96-
Arc::new(DepJoinEliminateAtScan::new()),
96+
Arc::new(DepJoinEliminate::new()),
9797
Arc::new(DepInitialDistinct::new()),
9898
Arc::new(DepJoinPastProj::new()),
9999
Arc::new(DepJoinPastFilter::new()),

optd-datafusion-repr/src/rules.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ pub use project_transpose::{
2727
project_merge::{EliminateProjectRule, ProjectMergeRule},
2828
};
2929
pub use subquery::{
30-
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
30+
DepInitialDistinct, DepJoinEliminate, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
3131
};
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pub mod depjoin_pushdown;
22

33
pub use depjoin_pushdown::{
4-
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
4+
DepInitialDistinct, DepJoinEliminate, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
55
};

optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -418,8 +418,10 @@ fn apply_dep_join_past_agg(
418418
vec![new_agg.into_rel_node().as_ref().clone()]
419419
}
420420

421+
// Heuristics-only rule. If we don't have references to the external columns on the right side,
422+
// we can rewrite the dependent join into a normal join.
421423
define_rule!(
422-
DepJoinEliminateAtScan,
424+
DepJoinEliminate,
423425
apply_dep_join_eliminate_at_scan, // TODO matching is all wrong
424426
(DepJoin(JoinType::Cross), left, right, [cond], [extern_cols])
425427
);
@@ -428,31 +430,40 @@ define_rule!(
428430
/// for an inner join! Our main mission is complete!
429431
fn apply_dep_join_eliminate_at_scan(
430432
_optimizer: &impl Optimizer<OptRelNodeTyp>,
431-
DepJoinEliminateAtScanPicks {
433+
DepJoinEliminatePicks {
432434
left,
433435
right,
434436
cond,
435437
extern_cols: _,
436-
}: DepJoinEliminateAtScanPicks,
438+
}: DepJoinEliminatePicks,
437439
) -> Vec<RelNode<OptRelNodeTyp>> {
438-
// TODO: Is there ever a situation we need to detect that we can convert earlier?
439-
// Technically we can convert as soon as we clear the last externcolumnref...
440-
441440
// Cross join should always have true cond
442441
assert!(cond == *ConstantExpr::bool(true).into_rel_node());
443442

444-
if right.typ != OptRelNodeTyp::Scan {
445-
return vec![];
443+
fn inspect(node: &RelNode<OptRelNodeTyp>) -> bool {
444+
if matches!(node.typ, OptRelNodeTyp::Placeholder(_)) {
445+
unimplemented!("this is a heuristics rule");
446+
}
447+
if node.typ == OptRelNodeTyp::ExternColumnRef {
448+
return false;
449+
}
450+
for child in &node.children {
451+
if !inspect(child) {
452+
return false;
453+
}
454+
}
455+
true
446456
}
447457

448-
// let scan = LogicalScan::new("test".to_string()).into_rel_node();
449-
450-
let new_join = LogicalJoin::new(
451-
PlanNode::from_group(left.into()),
452-
PlanNode::from_group(right.into()),
453-
ConstantExpr::bool(true).into_expr(),
454-
JoinType::Inner,
455-
);
456-
457-
vec![new_join.into_rel_node().as_ref().clone()]
458+
if inspect(&right) {
459+
let new_join = LogicalJoin::new(
460+
PlanNode::from_group(left.into()),
461+
PlanNode::from_group(right.into()),
462+
ConstantExpr::bool(true).into_expr(),
463+
JoinType::Inner,
464+
);
465+
vec![new_join.into_rel_node().as_ref().clone()]
466+
} else {
467+
vec![]
468+
}
458469
}

optd-sqlplannertest/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,5 @@ Currently we have the following options for the explain task:
5151
## Tracing a query
5252

5353
```
54-
RUST_LOG=optd_core=trace cargo run -p optd-sqlplannertest --bin planner_test_apply -- pushdowns &> log
54+
RUST_BACKTRACE=1 RUST_LOG=optd_core=trace,optd_datafusion_bridge=trace cargo run -p optd-sqlplannertest --bin planner_test_apply -- pushdowns &> log
5555
```

optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,87 @@ PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=9147.220000000001,io=3000
7575
└── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
7676
*/
7777

78+
-- Test whether the optimizer can unnest correlated subqueries.
79+
select * from t1 where (select sum(t2v3) from (select * from t2, t3 where t2v1 = t1v1 and t2v3 = t3v2)) > 100;
80+
81+
/*
82+
LogicalProjection { exprs: [ #0, #1 ] }
83+
└── LogicalFilter
84+
├── cond:Gt
85+
│ ├── #2
86+
│ └── 100(i64)
87+
└── RawDependentJoin { join_type: Cross, cond: true, extern_cols: [ Extern(#0) ] }
88+
├── LogicalScan { table: t1 }
89+
└── LogicalProjection { exprs: [ #0 ] }
90+
└── LogicalAgg
91+
├── exprs:Agg(Sum)
92+
│ └── [ Cast { cast_to: Int64, expr: #1 } ]
93+
├── groups: []
94+
└── LogicalProjection { exprs: [ #0, #1, #2, #3 ] }
95+
└── LogicalFilter
96+
├── cond:And
97+
│ ├── Eq
98+
│ │ ├── #0
99+
│ │ └── Extern(#0)
100+
│ └── Eq
101+
│ ├── #1
102+
│ └── #2
103+
└── LogicalJoin { join_type: Cross, cond: true }
104+
├── LogicalScan { table: t2 }
105+
└── LogicalScan { table: t3 }
106+
LogicalProjection { exprs: [ #0, #1 ] }
107+
└── LogicalFilter
108+
├── cond:Gt
109+
│ ├── #2
110+
│ └── 100(i64)
111+
└── LogicalProjection { exprs: [ #0, #1, #3 ] }
112+
└── LogicalJoin
113+
├── join_type: Inner
114+
├── cond:Eq
115+
│ ├── #0
116+
│ └── #2
117+
├── LogicalScan { table: t1 }
118+
└── LogicalProjection { exprs: [ #0, #1 ] }
119+
└── LogicalAgg
120+
├── exprs:Agg(Sum)
121+
│ └── [ Cast { cast_to: Int64, expr: #2 } ]
122+
├── groups: [ #1 ]
123+
└── LogicalProjection { exprs: [ #0, #1, #2, #3, #4 ] }
124+
└── LogicalFilter
125+
├── cond:And
126+
│ ├── Eq
127+
│ │ ├── #1
128+
│ │ └── #0
129+
│ └── Eq
130+
│ ├── #2
131+
│ └── #3
132+
└── LogicalJoin { join_type: Inner, cond: true }
133+
├── LogicalAgg { exprs: [], groups: [ #0 ] }
134+
│ └── LogicalScan { table: t1 }
135+
└── LogicalJoin { join_type: Cross, cond: true }
136+
├── LogicalScan { table: t2 }
137+
└── LogicalScan { table: t3 }
138+
PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=10153.240000000002,io=4000}, stat: {row_cnt=1} }
139+
└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=10145.220000000001,io=4000}, stat: {row_cnt=1} }
140+
├── PhysicalAgg
141+
│ ├── aggrs:Agg(Sum)
142+
│ │ └── [ Cast { cast_to: Int64, expr: #2 } ]
143+
│ ├── groups: [ #1 ]
144+
│ ├── cost: {compute=9139.2,io=3000}
145+
│ ├── stat: {row_cnt=1}
146+
│ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: {compute=9051.080000000002,io=3000}, stat: {row_cnt=1} }
147+
│ ├── PhysicalProjection { exprs: [ #2, #0, #1 ], cost: {compute=8045.06,io=2000}, stat: {row_cnt=1} }
148+
│ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=8033.04,io=2000}, stat: {row_cnt=1} }
149+
│ │ ├── PhysicalFilter
150+
│ │ │ ├── cond:Gt
151+
│ │ │ │ ├── #0
152+
│ │ │ │ └── 100(i64)
153+
│ │ │ ├── cost: {compute=5005,io=1000}
154+
│ │ │ ├── stat: {row_cnt=1}
155+
│ │ │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
156+
│ │ └── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=2022.0199999999995,io=1000}, stat: {row_cnt=1000} }
157+
│ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
158+
│ └── PhysicalScan { table: t3, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
159+
└── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
160+
*/
161+

optd-sqlplannertest/tests/subqueries/subquery_unnesting.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@
1414
desc: Test whether the optimizer can unnest correlated subqueries.
1515
tasks:
1616
- explain[verbose]:logical_optd,optimized_logical_optd,physical_optd
17+
- sql: |
18+
select * from t1 where (select sum(t2v3) from (select * from t2, t3 where t2v1 = t1v1 and t2v3 = t3v2)) > 100;
19+
desc: Test whether the optimizer can unnest correlated subqueries.
20+
tasks:
21+
- explain[verbose]:logical_optd,optimized_logical_optd,physical_optd

0 commit comments

Comments
 (0)