Skip to content

Commit 980a16d

Browse files
authored
feat(query): support left plan's from clause contains subquery (#17621)
* init * eliminate duplicates rows * x * update * update * aggr * clean * update * union * fix * fix * test * fix * fix * fix * rename * ConstantTableScan * test * test * fix * fix union
1 parent 876998d commit 980a16d

File tree

20 files changed

+901
-439
lines changed

20 files changed

+901
-439
lines changed

src/query/catalog/src/plan/datasource/datasource_plan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use crate::plan::Partitions;
2626
use crate::plan::PushDownInfo;
2727
use crate::table_args::TableArgs;
2828

29-
// TODO: Delete the scan plan field, but it depends on plan_parser:L394
3029
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
3130
pub struct DataSourcePlan {
3231
pub source_info: DataSourceInfo,

src/query/service/tests/it/sql/planner/optimizer/data/yaml/q1.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@ optimized_plan: |
220220
│ ├── order by: []
221221
│ └── limit: NONE
222222
└── EvalScalar
223-
├── scalars: [sr_store_sk (#103) AS (#103), multiply(divide(sum(ctr_total_return) (#145), if(eq(count(ctr_total_return) (#146), 0), 1, count(ctr_total_return) (#146))), 1.2) AS (#147)]
223+
├── scalars: [outer.sr_store_sk (#103) AS (#103), multiply(divide(sum(ctr_total_return) (#145), if(eq(count(ctr_total_return) (#146), 0), 1, count(ctr_total_return) (#146))), 1.2) AS (#147)]
224224
└── Aggregate(Final)
225-
├── group items: [subquery_103 (#103)]
225+
├── group items: [outer.sr_store_sk (#103)]
226226
├── aggregate functions: [sum(ctr_total_return) (#145), count(ctr_total_return) (#146)]
227227
└── Aggregate(Partial)
228-
├── group items: [subquery_103 (#103)]
228+
├── group items: [outer.sr_store_sk (#103)]
229229
├── aggregate functions: [sum(ctr_total_return) (#145), count(ctr_total_return) (#146)]
230230
└── Aggregate(Final)
231231
├── group items: [store_returns.sr_customer_sk (#99), store_returns.sr_store_sk (#103)]

src/query/sql/src/executor/physical_plans/physical_constant_table_scan.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
use databend_common_exception::Result;
1616
use databend_common_expression::Column;
1717
use databend_common_expression::DataSchemaRef;
18-
use databend_common_expression::DataSchemaRefExt;
1918

2019
use crate::executor::PhysicalPlan;
2120
use crate::executor::PhysicalPlanBuilder;
2221
use crate::ColumnSet;
22+
use crate::IndexType;
2323

2424
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
2525
pub struct ConstantTableScan {
@@ -50,20 +50,35 @@ impl PhysicalPlanBuilder {
5050
scan: &crate::plans::ConstantTableScan,
5151
required: ColumnSet,
5252
) -> Result<PhysicalPlan> {
53-
// 1. Prune unused Columns.
54-
let used: ColumnSet = required.intersection(&scan.columns).cloned().collect();
55-
let (values, fields) = if used == scan.columns {
56-
(scan.values.clone(), scan.schema.fields().clone())
57-
} else {
58-
let new_scan = scan.prune_columns(used);
59-
(new_scan.values.clone(), new_scan.schema.fields().clone())
60-
};
61-
// 2. Build physical plan.
53+
debug_assert!(scan
54+
.schema
55+
.fields
56+
.iter()
57+
.map(|field| field.name().parse::<IndexType>().unwrap())
58+
.collect::<ColumnSet>()
59+
.is_superset(&scan.columns));
60+
61+
let used: ColumnSet = required.intersection(&scan.columns).copied().collect();
62+
if used.len() < scan.columns.len() {
63+
let crate::plans::ConstantTableScan {
64+
values,
65+
num_rows,
66+
schema,
67+
..
68+
} = scan.prune_columns(used);
69+
return Ok(PhysicalPlan::ConstantTableScan(ConstantTableScan {
70+
plan_id: 0,
71+
values,
72+
num_rows,
73+
output_schema: schema,
74+
}));
75+
}
76+
6277
Ok(PhysicalPlan::ConstantTableScan(ConstantTableScan {
6378
plan_id: 0,
64-
values,
79+
values: scan.values.clone(),
6580
num_rows: scan.num_rows,
66-
output_schema: DataSchemaRefExt::create(fields),
81+
output_schema: scan.schema.clone(),
6782
}))
6883
}
6984
}

src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ impl Binder {
384384
let mut decorrelator =
385385
SubqueryRewriter::new(self.ctx.clone(), self.metadata.clone(), Some(self.clone()));
386386
right_child = decorrelator.flatten_plan(
387+
&left_child,
387388
&right_child,
388389
&right_prop.outer_columns,
389390
&mut FlattenInfo {

src/query/sql/src/planner/format/display.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,6 @@ impl<
162162
let mut tree = self
163163
.operator_humanizer
164164
.humanize_operator(self.id_humanizer, op);
165-
let children = s_expr
166-
.children()
167-
.map(|s_expr| self.humanize_s_expr(s_expr))
168-
.collect::<Result<Vec<_>>>()?;
169165

170166
if self.verbose {
171167
let rel_expr = RelExpr::with_s_expr(s_expr);
@@ -177,6 +173,11 @@ impl<
177173
tree.children.extend(stats);
178174
}
179175

176+
let children = s_expr
177+
.children()
178+
.map(|s_expr| self.humanize_s_expr(s_expr))
179+
.collect::<Result<Vec<_>>>()?;
180+
180181
tree.children.extend(children);
181182
Ok(tree)
182183
}

src/query/sql/src/planner/metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ pub struct DerivedColumn {
628628
pub alias: String,
629629
pub data_type: DataType,
630630
// if the derived column is generated by the scalar expr, save the `scalar_expr`.
631-
// Currently, it's only used by decorrelating subquery.
631+
// Currently, it's only used by WindowRewriter.
632632
pub scalar_expr: Option<ScalarExpr>,
633633
}
634634

src/query/sql/src/planner/optimizer/decorrelate/decorrelate.rs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -261,17 +261,18 @@ impl SubqueryRewriter {
261261

262262
pub fn try_decorrelate_subquery(
263263
&mut self,
264-
left: &SExpr,
264+
outer: &SExpr,
265265
subquery: &SubqueryExpr,
266266
flatten_info: &mut FlattenInfo,
267267
is_conjunctive_predicate: bool,
268268
) -> Result<(SExpr, UnnestResult)> {
269269
match subquery.typ {
270270
SubqueryType::Scalar => {
271-
let correlated_columns = subquery.outer_columns.clone();
271+
let correlated_columns = &subquery.outer_columns;
272272
let flatten_plan = self.flatten_plan(
273+
outer,
273274
&subquery.subquery,
274-
&correlated_columns,
275+
correlated_columns,
275276
flatten_info,
276277
false,
277278
)?;
@@ -280,23 +281,23 @@ impl SubqueryRewriter {
280281
let mut right_conditions = Vec::with_capacity(correlated_columns.len());
281282
self.add_equi_conditions(
282283
subquery.span,
283-
&correlated_columns,
284+
correlated_columns,
284285
&mut right_conditions,
285286
&mut left_conditions,
286287
)?;
287288

288-
let mut join_type = JoinType::LeftSingle;
289-
if matches!(subquery.contain_agg, Some(true)) {
289+
let join_type = if matches!(subquery.contain_agg, Some(true)) && {
290290
let rel_expr = RelExpr::with_s_expr(&subquery.subquery);
291-
let card = rel_expr
291+
rel_expr
292292
.derive_cardinality()?
293293
.statistics
294-
.precise_cardinality;
295-
296-
if card.is_some() {
297-
join_type = JoinType::Left;
298-
}
299-
}
294+
.precise_cardinality
295+
.is_some()
296+
} {
297+
JoinType::Left
298+
} else {
299+
JoinType::LeftSingle
300+
};
300301

301302
let join_plan = Join {
302303
equi_conditions: JoinEquiCondition::new_conditions(
@@ -315,21 +316,22 @@ impl SubqueryRewriter {
315316
};
316317
let s_expr = SExpr::create_binary(
317318
Arc::new(join_plan.into()),
318-
Arc::new(left.clone()),
319+
Arc::new(outer.clone()),
319320
Arc::new(flatten_plan),
320321
);
321322
Ok((s_expr, UnnestResult::SingleJoin))
322323
}
323324
SubqueryType::Exists | SubqueryType::NotExists => {
324325
if is_conjunctive_predicate {
325-
if let Some(result) = self.try_decorrelate_simple_subquery(left, subquery)? {
326+
if let Some(result) = self.try_decorrelate_simple_subquery(outer, subquery)? {
326327
return Ok((result, UnnestResult::SimpleJoin { output_index: None }));
327328
}
328329
}
329-
let correlated_columns = subquery.outer_columns.clone();
330+
let correlated_columns = &subquery.outer_columns;
330331
let flatten_plan = self.flatten_plan(
332+
outer,
331333
&subquery.subquery,
332-
&correlated_columns,
334+
correlated_columns,
333335
flatten_info,
334336
false,
335337
)?;
@@ -338,7 +340,7 @@ impl SubqueryRewriter {
338340
let mut right_conditions = Vec::with_capacity(correlated_columns.len());
339341
self.add_equi_conditions(
340342
subquery.span,
341-
&correlated_columns,
343+
correlated_columns,
342344
&mut left_conditions,
343345
&mut right_conditions,
344346
)?;
@@ -379,24 +381,25 @@ impl SubqueryRewriter {
379381
};
380382
let s_expr = SExpr::create_binary(
381383
Arc::new(join_plan.into()),
382-
Arc::new(left.clone()),
384+
Arc::new(outer.clone()),
383385
Arc::new(flatten_plan),
384386
);
385387
Ok((s_expr, UnnestResult::MarkJoin { marker_index }))
386388
}
387389
SubqueryType::Any => {
388-
let correlated_columns = subquery.outer_columns.clone();
390+
let correlated_columns = &subquery.outer_columns;
389391
let flatten_plan = self.flatten_plan(
392+
outer,
390393
&subquery.subquery,
391-
&correlated_columns,
394+
correlated_columns,
392395
flatten_info,
393396
false,
394397
)?;
395398
let mut left_conditions = Vec::with_capacity(correlated_columns.len());
396399
let mut right_conditions = Vec::with_capacity(correlated_columns.len());
397400
self.add_equi_conditions(
398401
subquery.span,
399-
&correlated_columns,
402+
correlated_columns,
400403
&mut left_conditions,
401404
&mut right_conditions,
402405
)?;
@@ -450,7 +453,7 @@ impl SubqueryRewriter {
450453
Ok((
451454
SExpr::create_binary(
452455
Arc::new(mark_join),
453-
Arc::new(left.clone()),
456+
Arc::new(outer.clone()),
454457
Arc::new(flatten_plan),
455458
),
456459
UnnestResult::MarkJoin { marker_index },
@@ -467,23 +470,23 @@ impl SubqueryRewriter {
467470
left_conditions: &mut Vec<ScalarExpr>,
468471
right_conditions: &mut Vec<ScalarExpr>,
469472
) -> Result<()> {
470-
let mut correlated_columns = correlated_columns.clone().into_iter().collect::<Vec<_>>();
473+
let mut correlated_columns = correlated_columns.iter().copied().collect::<Vec<_>>();
471474
correlated_columns.sort();
472-
for correlated_column in correlated_columns.iter() {
475+
for correlated_column in correlated_columns {
473476
let metadata = self.metadata.read();
474-
let column_entry = metadata.column(*correlated_column);
477+
let column_entry = metadata.column(correlated_column);
475478
let right_column = ScalarExpr::BoundColumnRef(BoundColumnRef {
476479
span,
477480
column: ColumnBindingBuilder::new(
478481
column_entry.name(),
479-
*correlated_column,
482+
correlated_column,
480483
Box::from(column_entry.data_type()),
481484
Visibility::Visible,
482485
)
483486
.table_index(column_entry.table_index())
484487
.build(),
485488
});
486-
let Some(derive_column) = self.derived_columns.get(correlated_column) else {
489+
let Some(derive_column) = self.derived_columns.get(&correlated_column) else {
487490
continue;
488491
};
489492
let column_entry = metadata.column(*derive_column);

0 commit comments

Comments
 (0)