Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 3b0e6b7

Browse files
jurplelAveryQi115
andauthored
feat: Filter Pushdown Rule (#140)
This PR brings a filter pushdown heuristic rule, built on @AveryQi115's hybrid scheme. ### Filter Pushdown Rule - This series of rules matches on any filter, and pushes any part of the filter down below a node if possible. - They are registered in the HeuristicsRuleWrapper currently, but they work as cost-based rules as well. ### Helper Functions - `LogOpExpr::new_flattened_nested_logical` creates a new `LogOpExpr` from an `ExprList`, and it flattens any nested `LogOpExpr`s of the same `LogOpType`. - `Expr::rewrite_column_refs` recursively rewrites any `ColumnExpr` in an expression tree, using a provided `rewrite_fn`. - `LogicalJoin::map_through_join` takes in left/right schema sizes, and maps an index to be as it would if it were pushed down to the left or right side of a join. - `LogicalProjection::compute_column_mapping` creates a `ColumnMapping` object from a `LogicalProjection`. - The `ColumnMapping` object has a few methods, but most importantly it has `rewrite_condition`, which given an expr, will rewrite the expression with the projection's mapping. ### Testing Utilities - `new_test_optimizer` creates a new heuristic optimizer, which applies a given rule. It uses a `TpchCatalog`. - `TpchCatalog` is a catalog implementing a couple of tables from the TPC-H schema. It can be extended to have more as needed. - `DummyCostModel` implements a cost model, only giving zero cost. It is used for constructing a cascades optimizer without a real cost model, and isn't used in this PR. - This pull request, using these test optimizer components, pioneers a new testing scheme, based on running a constructed query plan through an optimizer, rather than text-based SQL planner tests, which may be flaky. They also test rules in isolation. --------- Signed-off-by: AveryQi115 <[email protected]> Co-authored-by: AveryQi115 <[email protected]>
1 parent d732a10 commit 3b0e6b7

File tree

15 files changed

+1464
-542
lines changed

15 files changed

+1464
-542
lines changed

optd-datafusion-bridge/src/into_optd.rs

+8-33
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,6 @@ use optd_datafusion_repr::properties::schema::Schema as OPTDSchema;
1515

1616
use crate::OptdPlanContext;
1717

18-
// flatten_nested_logical is a helper function to flatten nested logical operators with same op type
19-
// eg. (a AND (b AND c)) => ExprList([a, b, c])
20-
// (a OR (b OR c)) => ExprList([a, b, c])
21-
// It assume the children of the input expr_list are already flattened
22-
// and can only be used in bottom up manner
23-
fn flatten_nested_logical(op: LogOpType, expr_list: ExprList) -> ExprList {
24-
// conv_into_optd_expr is building the children bottom up so there is no need to
25-
// call flatten_nested_logical recursively
26-
let mut new_expr_list = Vec::new();
27-
for child in expr_list.to_vec() {
28-
if let OptRelNodeTyp::LogOp(child_op) = child.typ() {
29-
if child_op == op {
30-
let child_log_op_expr = LogOpExpr::from_rel_node(child.into_rel_node()).unwrap();
31-
new_expr_list.extend(child_log_op_expr.children().to_vec());
32-
continue;
33-
}
34-
}
35-
new_expr_list.push(child.clone());
36-
}
37-
ExprList::new(new_expr_list)
38-
}
39-
4018
impl OptdPlanContext<'_> {
4119
fn conv_into_optd_table_scan(&mut self, node: &logical_plan::TableScan) -> Result<PlanNode> {
4220
let table_name = node.table_name.to_string();
@@ -73,14 +51,16 @@ impl OptdPlanContext<'_> {
7351
Operator::And => {
7452
let op = LogOpType::And;
7553
let expr_list = ExprList::new(vec![left, right]);
76-
let expr_list = flatten_nested_logical(op, expr_list);
77-
return Ok(LogOpExpr::new(op, expr_list).into_expr());
54+
return Ok(
55+
LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr()
56+
);
7857
}
7958
Operator::Or => {
8059
let op = LogOpType::Or;
8160
let expr_list = ExprList::new(vec![left, right]);
82-
let expr_list = flatten_nested_logical(op, expr_list);
83-
return Ok(LogOpExpr::new(op, expr_list).into_expr());
61+
return Ok(
62+
LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr()
63+
);
8464
}
8565
_ => {}
8666
}
@@ -331,13 +311,8 @@ impl OptdPlanContext<'_> {
331311
} else {
332312
let expr_list = ExprList::new(log_ops);
333313
// the expr from filter is already flattened in conv_into_optd_expr
334-
let expr_list = flatten_nested_logical(LogOpType::And, expr_list);
335-
Ok(LogicalJoin::new(
336-
left,
337-
right,
338-
LogOpExpr::new(LogOpType::And, expr_list).into_expr(),
339-
join_type,
340-
))
314+
let log_op = LogOpExpr::new_flattened_nested_logical(LogOpType::And, expr_list);
315+
Ok(LogicalJoin::new(left, right, log_op.into_expr(), join_type))
341316
}
342317
}
343318

optd-datafusion-repr/src/lib.rs

+23-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ use properties::{
2323
};
2424
use rules::{
2525
EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule, EliminateFilterRule,
26-
EliminateJoinRule, EliminateLimitRule, HashJoinRule, JoinAssocRule, JoinCommuteRule,
27-
PhysicalConversionRule, ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
26+
EliminateJoinRule, EliminateLimitRule, FilterAggTransposeRule, FilterCrossJoinTransposeRule,
27+
FilterInnerJoinTransposeRule, FilterMergeRule, FilterProjectTransposeRule,
28+
FilterSortTransposeRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule,
29+
ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
2830
};
2931

3032
pub use optd_core::rel_node::Value;
@@ -34,6 +36,8 @@ mod explain;
3436
pub mod plan_nodes;
3537
pub mod properties;
3638
pub mod rules;
39+
#[cfg(test)]
40+
mod testing;
3741

3842
pub struct DatafusionOptimizer {
3943
hueristic_optimizer: HeuristicsOptimizer<OptRelNodeTyp>,
@@ -92,6 +96,23 @@ impl DatafusionOptimizer {
9296
for rule in rules {
9397
rule_wrappers.push(RuleWrapper::new_cascades(rule));
9498
}
99+
// add all filter pushdown rules as heuristic rules
100+
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
101+
FilterProjectTransposeRule::new(),
102+
)));
103+
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(FilterMergeRule::new())));
104+
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
105+
FilterCrossJoinTransposeRule::new(),
106+
)));
107+
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
108+
FilterInnerJoinTransposeRule::new(),
109+
)));
110+
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
111+
FilterSortTransposeRule::new(),
112+
)));
113+
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
114+
FilterAggTransposeRule::new(),
115+
)));
95116
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new()))); // 17
96117
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new()))); // 18
97118
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinAssocRule::new())));

optd-datafusion-repr/src/plan_nodes.rs

+51
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::fmt::Debug;
1616
use std::sync::Arc;
1717

1818
use arrow_schema::DataType;
19+
use itertools::Itertools;
1920
use optd_core::{
2021
cascades::{CascadesOptimizer, GroupId},
2122
rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp},
@@ -284,6 +285,56 @@ impl Expr {
284285
pub fn child(&self, idx: usize) -> OptRelNodeRef {
285286
self.0.child(idx)
286287
}
288+
289+
/// Recursively rewrite all column references in the expression.using a provided
290+
/// function that replaces a column index.
291+
/// The provided function will, given a ColumnRefExpr's index,
292+
/// return either Some(usize) or None.
293+
/// - If it is Some, the column index can be rewritten with the value.
294+
/// - If any of the columns is None, we will return None all the way up
295+
/// the call stack, and no expression will be returned.
296+
pub fn rewrite_column_refs(
297+
&self,
298+
rewrite_fn: &impl Fn(usize) -> Option<usize>,
299+
) -> Option<Self> {
300+
assert!(self.typ().is_expression());
301+
if let OptRelNodeTyp::ColumnRef = self.typ() {
302+
let col_ref = ColumnRefExpr::from_rel_node(self.0.clone()).unwrap();
303+
let rewritten = rewrite_fn(col_ref.index());
304+
return if let Some(rewritten_idx) = rewritten {
305+
let new_col_ref = ColumnRefExpr::new(rewritten_idx);
306+
Some(Self(new_col_ref.into_rel_node()))
307+
} else {
308+
None
309+
};
310+
}
311+
312+
let children = self.0.children.clone();
313+
let children = children
314+
.into_iter()
315+
.map(|child| {
316+
if child.typ == OptRelNodeTyp::List {
317+
// TODO: What should we do with List?
318+
return Some(child);
319+
}
320+
Expr::from_rel_node(child.clone())
321+
.unwrap()
322+
.rewrite_column_refs(rewrite_fn)
323+
.map(|x| x.into_rel_node())
324+
})
325+
.collect::<Option<Vec<_>>>()?;
326+
Some(
327+
Expr::from_rel_node(
328+
RelNode {
329+
typ: self.0.typ.clone(),
330+
children: children.into_iter().collect_vec(),
331+
data: self.0.data.clone(),
332+
}
333+
.into(),
334+
)
335+
.unwrap(),
336+
)
337+
}
287338
}
288339

289340
impl OptRelNode for Expr {

optd-datafusion-repr/src/plan_nodes/expr.rs

+23
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,29 @@ impl LogOpExpr {
608608
))
609609
}
610610

611+
/// flatten_nested_logical is a helper function to flatten nested logical operators with same op type
612+
/// eg. (a AND (b AND c)) => ExprList([a, b, c])
613+
/// (a OR (b OR c)) => ExprList([a, b, c])
614+
/// It assume the children of the input expr_list are already flattened
615+
/// and can only be used in bottom up manner
616+
pub fn new_flattened_nested_logical(op: LogOpType, expr_list: ExprList) -> Self {
617+
// Since we assume that we are building the children bottom up,
618+
// there is no need to call flatten_nested_logical recursively
619+
let mut new_expr_list = Vec::new();
620+
for child in expr_list.to_vec() {
621+
if let OptRelNodeTyp::LogOp(child_op) = child.typ() {
622+
if child_op == op {
623+
let child_log_op_expr =
624+
LogOpExpr::from_rel_node(child.into_rel_node()).unwrap();
625+
new_expr_list.extend(child_log_op_expr.children().to_vec());
626+
continue;
627+
}
628+
}
629+
new_expr_list.push(child.clone());
630+
}
631+
LogOpExpr::new(op, ExprList::new(new_expr_list))
632+
}
633+
611634
pub fn children(&self) -> Vec<Expr> {
612635
self.0
613636
.0

optd-datafusion-repr/src/plan_nodes/join.rs

+17
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,20 @@ define_plan_node!(
6262
{ 3, right_keys: ExprList }
6363
], { join_type: JoinType }
6464
);
65+
66+
impl LogicalJoin {
67+
/// Takes in left/right schema sizes, and maps a column index to be as if it
68+
/// were pushed down to the left or right side of a join accordingly.
69+
pub fn map_through_join(
70+
col_idx: usize,
71+
left_schema_size: usize,
72+
right_schema_size: usize,
73+
) -> usize {
74+
assert!(col_idx < left_schema_size + right_schema_size);
75+
if col_idx < left_schema_size {
76+
col_idx
77+
} else {
78+
col_idx - left_schema_size
79+
}
80+
}
81+
}

optd-datafusion-repr/src/plan_nodes/projection.rs

+72-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::expr::ExprList;
22
use super::macros::define_plan_node;
33

4-
use super::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
4+
use super::{ColumnRefExpr, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
55

66
#[derive(Clone, Debug)]
77
pub struct LogicalProjection(pub PlanNode);
@@ -26,3 +26,74 @@ define_plan_node!(
2626
{ 1, exprs: ExprList }
2727
]
2828
);
29+
30+
/// This struct holds the mapping from original columns to projected columns.
31+
///
32+
/// # Example
33+
/// With the following plan:
34+
/// | Filter (#0 < 5)
35+
/// |
36+
/// |-| Projection [#2, #3]
37+
/// |- Scan [#0, #1, #2, #3]
38+
///
39+
/// The computed projection mapping is:
40+
/// #2 -> #0
41+
/// #3 -> #1
42+
pub struct ProjectionMapping {
43+
forward: Vec<usize>,
44+
_backward: Vec<Option<usize>>,
45+
}
46+
47+
impl ProjectionMapping {
48+
pub fn build(mapping: Vec<usize>) -> Option<Self> {
49+
let mut backward = vec![];
50+
for (i, &x) in mapping.iter().enumerate() {
51+
if x >= backward.len() {
52+
backward.resize(x + 1, None);
53+
}
54+
backward[x] = Some(i);
55+
}
56+
Some(Self {
57+
forward: mapping,
58+
_backward: backward,
59+
})
60+
}
61+
62+
pub fn projection_col_refers_to(&self, col: usize) -> usize {
63+
self.forward[col]
64+
}
65+
66+
pub fn _original_col_maps_to(&self, col: usize) -> Option<usize> {
67+
self._backward[col]
68+
}
69+
70+
/// Recursively rewrites all ColumnRefs in an Expr to *undo* the projection
71+
/// condition. You might want to do this if you are pushing something
72+
/// through a projection, or pulling a projection up.
73+
///
74+
/// # Example
75+
/// If we have a projection node, mapping column A to column B (A -> B)
76+
/// All B's in `cond` will be rewritten as A.
77+
pub fn rewrite_condition(&self, cond: Expr, child_schema_len: usize) -> Expr {
78+
let proj_schema_size = self.forward.len();
79+
cond.rewrite_column_refs(&|idx| {
80+
Some(if idx < proj_schema_size {
81+
self.projection_col_refers_to(idx)
82+
} else {
83+
idx - proj_schema_size + child_schema_len
84+
})
85+
})
86+
.unwrap()
87+
}
88+
}
89+
90+
impl LogicalProjection {
91+
pub fn compute_column_mapping(exprs: &ExprList) -> Option<ProjectionMapping> {
92+
let mut mapping = vec![];
93+
for expr in exprs.to_vec() {
94+
let col_expr = ColumnRefExpr::from_rel_node(expr.into_rel_node())?;
95+
mapping.push(col_expr.index());
96+
}
97+
ProjectionMapping::build(mapping)
98+
}
99+
}

optd-datafusion-repr/src/rules.rs

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
mod eliminate_duplicated_expr;
33
mod eliminate_limit;
44
mod filter;
5+
mod filter_pushdown;
56
mod joins;
67
mod macros;
78
mod physical;
@@ -12,6 +13,10 @@ pub use eliminate_duplicated_expr::{
1213
};
1314
pub use eliminate_limit::EliminateLimitRule;
1415
pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule};
16+
pub use filter_pushdown::{
17+
FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule,
18+
FilterMergeRule, FilterProjectTransposeRule, FilterSortTransposeRule,
19+
};
1520
pub use joins::{
1621
EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, ProjectionPullUpJoin,
1722
};

0 commit comments

Comments
 (0)