Skip to content

Commit c332b8b

Browse files
authored
fix(query): broadcast subquery for markjoin if condition has null values (#17706)
* fix(query): broadcast subquery for markjoin if condition has null values * fix(query): broadcast subquery for markjoin if condition has null values * update * update
1 parent 71e8e65 commit c332b8b

File tree

7 files changed

+303
-68
lines changed

7 files changed

+303
-68
lines changed

โ€Žsrc/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/decorrelate.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,18 @@ impl SubqueryRewriter {
381381
&mut left_conditions,
382382
&mut right_conditions,
383383
)?;
384+
385+
let mut is_null_equal = Vec::new();
386+
for (i, (l, r)) in left_conditions
387+
.iter()
388+
.zip(right_conditions.iter())
389+
.enumerate()
390+
{
391+
if l.data_type()?.is_nullable() || r.data_type()?.is_nullable() {
392+
is_null_equal.push(i);
393+
}
394+
}
395+
384396
let output_column = subquery.output_column.clone();
385397
let column_name = format!("subquery_{}", output_column.index);
386398
let right_condition = ScalarExpr::BoundColumnRef(BoundColumnRef {
@@ -403,6 +415,7 @@ impl SubqueryRewriter {
403415
params: vec![],
404416
arguments: vec![child_expr, right_condition],
405417
})];
418+
406419
let marker_index = if let Some(idx) = subquery.projection_index {
407420
idx
408421
} else {
@@ -412,11 +425,12 @@ impl SubqueryRewriter {
412425
None,
413426
)
414427
};
428+
415429
let mark_join = Join {
416430
equi_conditions: JoinEquiCondition::new_conditions(
417431
right_conditions,
418432
left_conditions,
419-
vec![],
433+
is_null_equal,
420434
),
421435
non_equi_conditions,
422436
join_type: JoinType::RightMark,

โ€Žsrc/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_rewriter.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,8 @@ impl SubqueryRewriter {
391391
),
392392
})
393393
} else if subquery.typ == SubqueryType::NotExists {
394+
// Not exists subquery should be rewritten to `not(is_true(column_ref))`
395+
// not null mark value will consider as: not [null] ---> not [false] ---> true
394396
ScalarExpr::FunctionCall(FunctionCall {
395397
span: subquery.span,
396398
func_name: "not".to_string(),
@@ -403,6 +405,7 @@ impl SubqueryRewriter {
403405
})],
404406
})
405407
} else if subquery.typ == SubqueryType::Exists {
408+
// null value will consider as false
406409
ScalarExpr::FunctionCall(FunctionCall {
407410
span: subquery.span,
408411
func_name: "is_true".to_string(),
@@ -671,14 +674,26 @@ impl SubqueryRewriter {
671674
None,
672675
)
673676
};
677+
678+
let mut is_null_equal = Vec::new();
679+
for (i, (l, r)) in left_conditions
680+
.iter()
681+
.zip(right_conditions.iter())
682+
.enumerate()
683+
{
684+
if l.data_type()?.is_nullable() || r.data_type()?.is_nullable() {
685+
is_null_equal.push(i);
686+
}
687+
}
688+
674689
// Consider the sql: select * from t1 where t1.a = any(select t2.a from t2);
675690
// Will be transferred to:select t1.a, t2.a, marker_index from t1, t2 where t2.a = t1.a;
676691
// Note that subquery is the right table, and it'll be the build side.
677692
let mark_join = Join {
678693
equi_conditions: JoinEquiCondition::new_conditions(
679694
right_conditions,
680695
left_conditions,
681-
vec![],
696+
is_null_equal,
682697
),
683698
non_equi_conditions,
684699
join_type: JoinType::RightMark,

โ€Žsrc/query/sql/src/planner/optimizer/optimizers/rule/rewrite/push_down_filter_join/mark_join_to_semi_join.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ use crate::ScalarExpr;
2525
pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
2626
let mut filter: Filter = s_expr.plan().clone().try_into()?;
2727
let mut join: Join = s_expr.child(0)?.plan().clone().try_into()?;
28+
2829
let has_disjunction = filter.predicates.iter().any(
2930
|predicate| matches!(predicate, ScalarExpr::FunctionCall(func) if func.func_name == "or"),
3031
);
32+
3133
if !join.join_type.is_mark_join() || has_disjunction {
3234
return Ok((s_expr.clone(), false));
3335
}
@@ -63,6 +65,11 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
6365
_ => unreachable!(),
6466
};
6567

68+
// clear is null equal sign
69+
join.equi_conditions.iter_mut().for_each(|c| {
70+
c.is_null_equal = false;
71+
});
72+
6673
let s_join_expr = s_expr.child(0)?;
6774
let mut result = SExpr::create_binary(
6875
Arc::new(join.into()),

โ€Žsrc/query/sql/src/planner/plans/join.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ pub struct HashJoinBuildCacheInfo {
155155
pub struct JoinEquiCondition {
156156
pub left: ScalarExpr,
157157
pub right: ScalarExpr,
158-
// Used for "is (not) distinct from".
158+
// Used for "is (not) distinct from" and mark join
159159
pub is_null_equal: bool,
160160
}
161161

@@ -669,6 +669,47 @@ impl Operator for Join {
669669
) -> Result<Vec<Vec<RequiredProperty>>> {
670670
let mut children_required = vec![];
671671

672+
// For mark join with nullable eq comparison, ensure to use broadcast for subquery side
673+
if self.join_type.is_mark_join()
674+
&& self.equi_conditions.len() == 1
675+
&& self.has_null_equi_condition()
676+
{
677+
// subquery as left probe side
678+
if matches!(self.join_type, JoinType::LeftMark) {
679+
let conditions = self
680+
.equi_conditions
681+
.iter()
682+
.map(|condition| condition.right.clone())
683+
.collect();
684+
685+
children_required.push(vec![
686+
RequiredProperty {
687+
distribution: Distribution::Broadcast,
688+
},
689+
RequiredProperty {
690+
distribution: Distribution::Hash(conditions),
691+
},
692+
]);
693+
} else {
694+
// subquery as right build side
695+
let conditions = self
696+
.equi_conditions
697+
.iter()
698+
.map(|condition| condition.left.clone())
699+
.collect();
700+
701+
children_required.push(vec![
702+
RequiredProperty {
703+
distribution: Distribution::Hash(conditions),
704+
},
705+
RequiredProperty {
706+
distribution: Distribution::Broadcast,
707+
},
708+
]);
709+
}
710+
return Ok(children_required);
711+
}
712+
672713
let settings = ctx.get_settings();
673714
if self.join_type != JoinType::Cross && !settings.get_enforce_broadcast_join()? {
674715
// (Hash, Hash)
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
statement ok
2+
create or replace database d_subquery;
3+
4+
statement ok
5+
use d_subquery;
6+
7+
statement ok
8+
CREATE OR REPLACE TABLE t1 (a int);
9+
10+
statement ok
11+
CREATE OR REPLACE TABLE t2 (a int);
12+
13+
statement ok
14+
INSERT INTO t1 VALUES (1), (2), (NULL);
15+
16+
statement ok
17+
INSERT INTO t2 VALUES (2), (NULL);
18+
19+
20+
query T
21+
explain SELECT t1.a, EXISTS(SELECT 1 FROM t2 WHERE t2.a = t1.a), not EXISTS(SELECT 1 FROM t2 WHERE t2.a = t1.a) AS has_match
22+
FROM t1;
23+
----
24+
Exchange
25+
โ”œโ”€โ”€ output columns: [t1.a (#0), EXISTS (SELECT 1 FROM t2 WHERE t2.a = t1.a) (#5), has_match (#6)]
26+
โ”œโ”€โ”€ exchange type: Merge
27+
โ””โ”€โ”€ EvalScalar
28+
โ”œโ”€โ”€ output columns: [t1.a (#0), EXISTS (SELECT 1 FROM t2 WHERE t2.a = t1.a) (#5), has_match (#6)]
29+
โ”œโ”€โ”€ expressions: [is_true(7 (#7)), NOT is_true(8 (#8))]
30+
โ”œโ”€โ”€ estimated rows: 3.00
31+
โ””โ”€โ”€ HashJoin
32+
โ”œโ”€โ”€ output columns: [t1.a (#0), marker (#7), marker (#8)]
33+
โ”œโ”€โ”€ join type: RIGHT MARK
34+
โ”œโ”€โ”€ build keys: [a (#3)]
35+
โ”œโ”€โ”€ probe keys: [a (#0)]
36+
โ”œโ”€โ”€ keys is null equal: [true]
37+
โ”œโ”€โ”€ filters: []
38+
โ”œโ”€โ”€ estimated rows: 3.00
39+
โ”œโ”€โ”€ Exchange(Build)
40+
โ”‚ โ”œโ”€โ”€ output columns: [t2.a (#3)]
41+
โ”‚ โ”œโ”€โ”€ exchange type: Broadcast
42+
โ”‚ โ””โ”€โ”€ Filter
43+
โ”‚ โ”œโ”€โ”€ output columns: [t2.a (#3)]
44+
โ”‚ โ”œโ”€โ”€ filters: [is_true(outer.a (#3) = outer.a (#3))]
45+
โ”‚ โ”œโ”€โ”€ estimated rows: 0.40
46+
โ”‚ โ””โ”€โ”€ TableScan
47+
โ”‚ โ”œโ”€โ”€ table: default.d_subquery.t2
48+
โ”‚ โ”œโ”€โ”€ output columns: [a (#3)]
49+
โ”‚ โ”œโ”€โ”€ read rows: 2
50+
โ”‚ โ”œโ”€โ”€ read size: < 1 KiB
51+
โ”‚ โ”œโ”€โ”€ partitions total: 1
52+
โ”‚ โ”œโ”€โ”€ partitions scanned: 1
53+
โ”‚ โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
54+
โ”‚ โ”œโ”€โ”€ push downs: [filters: [is_true(t2.a (#3) = t2.a (#3))], limit: NONE]
55+
โ”‚ โ””โ”€โ”€ estimated rows: 2.00
56+
โ””โ”€โ”€ Exchange(Probe)
57+
โ”œโ”€โ”€ output columns: [t1.a (#0), marker (#7)]
58+
โ”œโ”€โ”€ exchange type: Hash(a (#0))
59+
โ””โ”€โ”€ HashJoin
60+
โ”œโ”€โ”€ output columns: [t1.a (#0), marker (#7)]
61+
โ”œโ”€โ”€ join type: RIGHT MARK
62+
โ”œโ”€โ”€ build keys: [a (#1)]
63+
โ”œโ”€โ”€ probe keys: [a (#0)]
64+
โ”œโ”€โ”€ keys is null equal: [true]
65+
โ”œโ”€โ”€ filters: []
66+
โ”œโ”€โ”€ estimated rows: 3.00
67+
โ”œโ”€โ”€ Exchange(Build)
68+
โ”‚ โ”œโ”€โ”€ output columns: [t2.a (#1)]
69+
โ”‚ โ”œโ”€โ”€ exchange type: Broadcast
70+
โ”‚ โ””โ”€โ”€ Filter
71+
โ”‚ โ”œโ”€โ”€ output columns: [t2.a (#1)]
72+
โ”‚ โ”œโ”€โ”€ filters: [is_true(outer.a (#1) = outer.a (#1))]
73+
โ”‚ โ”œโ”€โ”€ estimated rows: 0.40
74+
โ”‚ โ””โ”€โ”€ TableScan
75+
โ”‚ โ”œโ”€โ”€ table: default.d_subquery.t2
76+
โ”‚ โ”œโ”€โ”€ output columns: [a (#1)]
77+
โ”‚ โ”œโ”€โ”€ read rows: 2
78+
โ”‚ โ”œโ”€โ”€ read size: < 1 KiB
79+
โ”‚ โ”œโ”€โ”€ partitions total: 1
80+
โ”‚ โ”œโ”€โ”€ partitions scanned: 1
81+
โ”‚ โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
82+
โ”‚ โ”œโ”€โ”€ push downs: [filters: [is_true(t2.a (#1) = t2.a (#1))], limit: NONE]
83+
โ”‚ โ””โ”€โ”€ estimated rows: 2.00
84+
โ””โ”€โ”€ Exchange(Probe)
85+
โ”œโ”€โ”€ output columns: [t1.a (#0)]
86+
โ”œโ”€โ”€ exchange type: Hash(a (#0))
87+
โ””โ”€โ”€ TableScan
88+
โ”œโ”€โ”€ table: default.d_subquery.t1
89+
โ”œโ”€โ”€ output columns: [a (#0)]
90+
โ”œโ”€โ”€ read rows: 3
91+
โ”œโ”€โ”€ read size: < 1 KiB
92+
โ”œโ”€โ”€ partitions total: 1
93+
โ”œโ”€โ”€ partitions scanned: 1
94+
โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
95+
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
96+
โ””โ”€โ”€ estimated rows: 3.00
97+
98+
query T
99+
explain SELECT
100+
t1.a,
101+
t1.a IN (SELECT a FROM t2) AS in_match,
102+
t1.a NOT IN (SELECT a FROM t2) AS not_in_match
103+
FROM t1;
104+
----
105+
Exchange
106+
โ”œโ”€โ”€ output columns: [t1.a (#0), in_match (#3), not_in_match (#4)]
107+
โ”œโ”€โ”€ exchange type: Merge
108+
โ””โ”€โ”€ EvalScalar
109+
โ”œโ”€โ”€ output columns: [t1.a (#0), in_match (#3), not_in_match (#4)]
110+
โ”œโ”€โ”€ expressions: [NOT 5 (#5)]
111+
โ”œโ”€โ”€ estimated rows: 3.00
112+
โ””โ”€โ”€ HashJoin
113+
โ”œโ”€โ”€ output columns: [t1.a (#0), in_match (#3), marker (#5)]
114+
โ”œโ”€โ”€ join type: RIGHT MARK
115+
โ”œโ”€โ”€ build keys: [subquery_2 (#2)]
116+
โ”œโ”€โ”€ probe keys: [t1.a (#0)]
117+
โ”œโ”€โ”€ keys is null equal: [true]
118+
โ”œโ”€โ”€ filters: []
119+
โ”œโ”€โ”€ estimated rows: 3.00
120+
โ”œโ”€โ”€ Exchange(Build)
121+
โ”‚ โ”œโ”€โ”€ output columns: [t2.a (#2)]
122+
โ”‚ โ”œโ”€โ”€ exchange type: Broadcast
123+
โ”‚ โ””โ”€โ”€ TableScan
124+
โ”‚ โ”œโ”€โ”€ table: default.d_subquery.t2
125+
โ”‚ โ”œโ”€โ”€ output columns: [a (#2)]
126+
โ”‚ โ”œโ”€โ”€ read rows: 2
127+
โ”‚ โ”œโ”€โ”€ read size: < 1 KiB
128+
โ”‚ โ”œโ”€โ”€ partitions total: 1
129+
โ”‚ โ”œโ”€โ”€ partitions scanned: 1
130+
โ”‚ โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
131+
โ”‚ โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
132+
โ”‚ โ””โ”€โ”€ estimated rows: 2.00
133+
โ””โ”€โ”€ Exchange(Probe)
134+
โ”œโ”€โ”€ output columns: [t1.a (#0), in_match (#3)]
135+
โ”œโ”€โ”€ exchange type: Hash(t1.a (#0))
136+
โ””โ”€โ”€ HashJoin
137+
โ”œโ”€โ”€ output columns: [t1.a (#0), in_match (#3)]
138+
โ”œโ”€โ”€ join type: RIGHT MARK
139+
โ”œโ”€โ”€ build keys: [subquery_1 (#1)]
140+
โ”œโ”€โ”€ probe keys: [t1.a (#0)]
141+
โ”œโ”€โ”€ keys is null equal: [true]
142+
โ”œโ”€โ”€ filters: []
143+
โ”œโ”€โ”€ estimated rows: 3.00
144+
โ”œโ”€โ”€ Exchange(Build)
145+
โ”‚ โ”œโ”€โ”€ output columns: [t2.a (#1)]
146+
โ”‚ โ”œโ”€โ”€ exchange type: Broadcast
147+
โ”‚ โ””โ”€โ”€ TableScan
148+
โ”‚ โ”œโ”€โ”€ table: default.d_subquery.t2
149+
โ”‚ โ”œโ”€โ”€ output columns: [a (#1)]
150+
โ”‚ โ”œโ”€โ”€ read rows: 2
151+
โ”‚ โ”œโ”€โ”€ read size: < 1 KiB
152+
โ”‚ โ”œโ”€โ”€ partitions total: 1
153+
โ”‚ โ”œโ”€โ”€ partitions scanned: 1
154+
โ”‚ โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
155+
โ”‚ โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
156+
โ”‚ โ””โ”€โ”€ estimated rows: 2.00
157+
โ””โ”€โ”€ Exchange(Probe)
158+
โ”œโ”€โ”€ output columns: [t1.a (#0)]
159+
โ”œโ”€โ”€ exchange type: Hash(t1.a (#0))
160+
โ””โ”€โ”€ TableScan
161+
โ”œโ”€โ”€ table: default.d_subquery.t1
162+
โ”œโ”€โ”€ output columns: [a (#0)]
163+
โ”œโ”€โ”€ read rows: 3
164+
โ”œโ”€โ”€ read size: < 1 KiB
165+
โ”œโ”€โ”€ partitions total: 1
166+
โ”œโ”€โ”€ partitions scanned: 1
167+
โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
168+
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
169+
โ””โ”€โ”€ estimated rows: 3.00
170+
171+
172+
statement ok
173+
drop database d_subquery

0 commit comments

Comments
ย (0)