@@ -116,14 +116,21 @@ fn apply_dep_initial_distinct(
116
116
117
117
// Our join condition is going to make sure that all of the correlated columns
118
118
// in the right side are equal to their equivalent columns in the left side.
119
- // (they will have the same index, just shifted over)
119
+ //
120
+ // If we have correlated columns [#16, #17], we want our condition to be:
121
+ // #16 = #0 AND #17 = #1
122
+ //
123
+ // This is because the aggregate we install on the right side will map the
124
+ // correlated columns to their respective indices as shown.
120
125
let join_cond = LogOpPred :: new (
121
126
LogOpType :: And ,
122
- ( 0 ..correlated_col_indices. len ( ) )
123
- . map ( |i| {
127
+ correlated_col_indices
128
+ . iter ( )
129
+ . enumerate ( )
130
+ . map ( |( i, x) | {
124
131
assert ! ( i + left_schema_size < left_schema_size + right_schema_size) ;
125
132
BinOpPred :: new (
126
- ColumnRefPred :: new ( i ) . into_pred_node ( ) ,
133
+ ColumnRefPred :: new ( * x ) . into_pred_node ( ) ,
127
134
ColumnRefPred :: new ( i + left_schema_size) . into_pred_node ( ) ,
128
135
BinOpType :: Eq ,
129
136
)
@@ -177,16 +184,18 @@ fn apply_dep_join_past_proj(
177
184
let cond = join. cond ( ) ;
178
185
let extern_cols = join. extern_cols ( ) ;
179
186
let proj = LogicalProjection :: from_plan_node ( right. unwrap_plan_node ( ) ) . unwrap ( ) ;
187
+ let proj_exprs = proj. exprs ( ) ;
180
188
let right = proj. child ( ) ;
181
189
182
190
// TODO: can we have external columns in projection node? I don't think so?
183
191
// Cross join should always have true cond
184
192
assert ! ( cond == ConstantPred :: bool ( true ) . into_pred_node( ) ) ;
185
193
let left_schema_len = optimizer. get_schema_of ( left. clone ( ) ) . len ( ) ;
186
- let right_schema_len = optimizer. get_schema_of ( right. clone ( ) ) . len ( ) ;
187
194
188
- let right_cols_proj =
189
- ( 0 ..right_schema_len) . map ( |x| ColumnRefPred :: new ( x + left_schema_len) . into_pred_node ( ) ) ;
195
+ let right_cols_proj = proj_exprs. to_vec ( ) . into_iter ( ) . map ( |x| {
196
+ x. rewrite_column_refs ( |col| Some ( col + left_schema_len) )
197
+ . unwrap ( )
198
+ } ) ;
190
199
191
200
let left_cols_proj = ( 0 ..left_schema_len) . map ( |x| ColumnRefPred :: new ( x) . into_pred_node ( ) ) ;
192
201
let new_proj_exprs = ListPred :: new (
@@ -281,7 +290,7 @@ define_rule!(
281
290
/// talk by Mark Raasveldt. The correlated columns are covered in the original paper.
282
291
///
283
292
/// TODO: the outer join is not implemented yet, so some edge cases won't work.
284
- /// Run SQList tests to catch these, I guess.
293
+ /// Run SQLite tests to catch these, I guess.
285
294
fn apply_dep_join_past_agg (
286
295
_optimizer : & impl Optimizer < DfNodeType > ,
287
296
binding : ArcDfPlanNode ,
@@ -310,15 +319,14 @@ fn apply_dep_join_past_agg(
310
319
} )
311
320
. collect :: < Vec < _ > > ( ) ;
312
321
322
+ // We need to group by all correlated columns.
323
+ // In our initial distinct step, we installed an agg node that groups by all correlated columns.
324
+ // Keeping this in mind, we only need to append a sequential number for each correlated column,
325
+ // as these will correspond to the outputs of the agg node.
313
326
let new_groups = ListPred :: new (
314
- groups
315
- . to_vec ( )
316
- . into_iter ( )
317
- . map ( |x| {
318
- x. rewrite_column_refs ( |col| Some ( col + correlated_col_indices. len ( ) ) )
319
- . unwrap ( )
320
- } )
321
- . chain ( correlated_col_indices. iter ( ) . map ( |x| {
327
+ ( 0 ..correlated_col_indices. len ( ) )
328
+ . map ( |x| ColumnRefPred :: new ( x) . into_pred_node ( ) )
329
+ . chain ( groups. to_vec ( ) . into_iter ( ) . map ( |x| {
322
330
x. rewrite_column_refs ( |col| Some ( col + correlated_col_indices. len ( ) ) )
323
331
. unwrap ( )
324
332
} ) )
0 commit comments