From 5f26d36e432a64aa8bb37fd9e694eb886a0ab2a2 Mon Sep 17 00:00:00 2001 From: Benjamin Owad Date: Thu, 5 Dec 2024 19:29:08 -0500 Subject: [PATCH] Filter/Agg transpose bugfix (#252) The check for whether or not a filter expression could be pushed beyond an agg node was incorrect. It was checking if the column was in the group by columns (checking the equality of the numbers), when it should be checking based on indices if we are only referring to columns that are *emitted* from the agg node as group by columns. For example, if we see: ``` Filter #1 > 100 Agg { groups: [#1], agg: Sum() } ``` We should *not* push down because `#1` refers to the sum column. In the current main branch, it is pushed down because it sees that `#1` equals a column in the `groups` field. It should be checking that every column is `< groups.len()` instead. --- .../src/rules/filter_pushdown.rs | 5 +- optd-sqllogictest/slt/_basic_tables.slt.part | 11 +++ optd-sqllogictest/slt/unnest-dup.slt | 9 +++ .../subqueries/subquery_unnesting.planner.sql | 74 +++++++++---------- 4 files changed, 60 insertions(+), 39 deletions(-) create mode 100644 optd-sqllogictest/slt/_basic_tables.slt.part create mode 100644 optd-sqllogictest/slt/unnest-dup.slt diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs index 760e222a..c576b844 100644 --- a/optd-datafusion-repr/src/rules/filter_pushdown.rs +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -346,7 +346,10 @@ fn apply_filter_agg_transpose( let mut group_by_cols_only = true; for child in children { if let Some(col_ref) = ColumnRefPred::from_pred_node(child.clone()) { - if !group_cols.contains(&col_ref.index()) { + // The agg schema is (group columns) + (expr columns), + // so if the column ref is < group_cols.len(), it is + // a group column. + if col_ref.index() >= group_cols.len() { group_by_cols_only = false; break; } diff --git a/optd-sqllogictest/slt/_basic_tables.slt.part b/optd-sqllogictest/slt/_basic_tables.slt.part new file mode 100644 index 00000000..2fa9b935 --- /dev/null +++ b/optd-sqllogictest/slt/_basic_tables.slt.part @@ -0,0 +1,11 @@ +statement ok +create table t1(v1 int, v2 int); + +statement ok +create table t2(v3 int, v4 int); + +statement ok +insert into t1 values (1, 100), (2, 200), (2, 250), (3, 300), (3, 300); + +statement ok +insert into t2 values (2, 200), (2, 250), (3, 300); diff --git a/optd-sqllogictest/slt/unnest-dup.slt b/optd-sqllogictest/slt/unnest-dup.slt new file mode 100644 index 00000000..fb4b3e46 --- /dev/null +++ b/optd-sqllogictest/slt/unnest-dup.slt @@ -0,0 +1,9 @@ +include _basic_tables.slt.part + +query +select * from t1 where (select sum(v4) from t2 where v3 = v1) > 100; +---- +2 200 +2 250 +3 300 +3 300 diff --git a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql index a9a92dc9..b6b0bbec 100644 --- a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql +++ b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql @@ -53,25 +53,24 @@ LogicalProjection { exprs: [ #0, #1 ] } ├── LogicalAgg { exprs: [], groups: [ #0 ] } │ └── LogicalScan { table: t1 } └── LogicalScan { table: t2 } -PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=8019,io=3000}, stat: {row_cnt=1} } -└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=8016,io=3000}, stat: {row_cnt=1} } - ├── PhysicalAgg - │ ├── aggrs:Agg(Sum) - │ │ └── [ Cast { cast_to: Int64, child: #2 } ] - │ ├── groups: [ #1 ] - │ ├── cost: {compute=7014,io=2000} +PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=18005,io=3000}, stat: {row_cnt=1} } +└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=18002,io=3000}, stat: {row_cnt=1} } + ├── PhysicalFilter + │ ├── cond:Gt + │ │ ├── #1 + │ │ └── 100(i64) + │ ├── cost: {compute=17000,io=2000} │ ├── stat: {row_cnt=1} - │ └── PhysicalProjection { exprs: [ #2, #0, #1 ], cost: {compute=7006,io=2000}, stat: {row_cnt=1} } - │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=7002,io=2000}, stat: {row_cnt=1} } - │ ├── PhysicalFilter - │ │ ├── cond:Gt - │ │ │ ├── #0 - │ │ │ └── 100(i64) - │ │ ├── cost: {compute=3000,io=1000} - │ │ ├── stat: {row_cnt=1} - │ │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── [ Cast { cast_to: Int64, child: #2 } ] + │ ├── groups: [ #1 ] + │ ├── cost: {compute=14000,io=2000} + │ ├── stat: {row_cnt=1000} + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ @@ -135,27 +134,26 @@ LogicalProjection { exprs: [ #0, #1 ] } └── LogicalJoin { join_type: Cross, cond: true } ├── LogicalScan { table: t2 } └── LogicalScan { table: t3 } -PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=9021,io=4000}, stat: {row_cnt=1} } -└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=9018,io=4000}, stat: {row_cnt=1} } - ├── PhysicalAgg - │ ├── aggrs:Agg(Sum) - │ │ └── [ Cast { cast_to: Int64, child: #2 } ] - │ ├── groups: [ #1 ] - │ ├── cost: {compute=8016,io=3000} +PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=21005,io=4000}, stat: {row_cnt=1} } +└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=21002,io=4000}, stat: {row_cnt=1} } + ├── PhysicalFilter + │ ├── cond:Gt + │ │ ├── #1 + │ │ └── 100(i64) + │ ├── cost: {compute=20000,io=3000} │ ├── stat: {row_cnt=1} - │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: {compute=8008,io=3000}, stat: {row_cnt=1} } - │ ├── PhysicalProjection { exprs: [ #2, #0, #1 ], cost: {compute=7006,io=2000}, stat: {row_cnt=1} } - │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=7002,io=2000}, stat: {row_cnt=1} } - │ │ ├── PhysicalFilter - │ │ │ ├── cond:Gt - │ │ │ │ ├── #0 - │ │ │ │ └── 100(i64) - │ │ │ ├── cost: {compute=3000,io=1000} - │ │ │ ├── stat: {row_cnt=1} - │ │ │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - │ │ └── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t3, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── [ Cast { cast_to: Int64, child: #2 } ] + │ ├── groups: [ #1 ] + │ ├── cost: {compute=17000,io=3000} + │ ├── stat: {row_cnt=1000} + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: {compute=9000,io=3000}, stat: {row_cnt=1000} } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } + │ │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalScan { table: t3, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */