Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,11 @@ impl HashJoinExec {
true
}

/// column indices
pub fn column_indices(&self) -> &Vec<ColumnIndex> {
&self.column_indices
}

/// left (build) side which gets hashed
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
Expand Down Expand Up @@ -1557,23 +1562,21 @@ impl ExecutionPlan for HashJoinExec {
return Ok(None);
}

// TODO: split by `col`/`JoinSide` instead so mark joins can also push down to children.
let schema = self.schema();
if !matches!(self.join_type(), JoinType::LeftMark | JoinType::RightMark)
&& let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
join_on,
}) = try_pushdown_through_join(
projection,
self.left(),
self.right(),
self.on(),
&schema,
self.filter(),
)?
{
if let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
join_on,
}) = try_pushdown_through_join(
projection,
self.left(),
self.right(),
self.on(),
&schema,
self.filter(),
self.column_indices(),
)? {
self.builder()
.with_new_children(vec![
Arc::new(projected_left_child),
Expand Down
35 changes: 19 additions & 16 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ impl NestedLoopJoinExec {
.build()
}

/// column indices
pub fn column_indices(&self) -> &Vec<ColumnIndex> {
&self.column_indices
}

/// left side
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
Expand Down Expand Up @@ -733,23 +738,21 @@ impl ExecutionPlan for NestedLoopJoinExec {
return Ok(None);
}

// TODO: split by `col`/`JoinSide` instead so mark joins can also push down to children.
let schema = self.schema();
if !matches!(self.join_type(), JoinType::LeftMark | JoinType::RightMark)
&& let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
..
}) = try_pushdown_through_join(
projection,
self.left(),
self.right(),
&[],
&schema,
self.filter(),
)?
{
if let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
..
}) = try_pushdown_through_join(
projection,
self.left(),
self.right(),
&[],
&schema,
self.filter(),
self.column_indices(),
)? {
Ok(Some(Arc::new(NestedLoopJoinExec::try_new(
Arc::new(projected_left_child),
Arc::new(projected_right_child),
Expand Down
108 changes: 71 additions & 37 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,61 +642,71 @@ pub struct JoinData {
pub join_on: JoinOn,
}

pub fn try_pushdown_through_join(
pub(crate) fn try_pushdown_through_join(
projection: &ProjectionExec,
join_left: &Arc<dyn ExecutionPlan>,
join_right: &Arc<dyn ExecutionPlan>,
join_on: JoinOnRef,
schema: &SchemaRef,
filter: Option<&JoinFilter>,
column_indices: &[ColumnIndex],
) -> Result<Option<JoinData>> {
// Convert projected expressions to columns. We can not proceed if this is not possible.
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};

let (far_right_left_col_ind, far_left_right_col_ind) =
join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
if projection_as_columns.len() >= schema.fields().len() {
return Ok(None);
}
let mut left_proj: Vec<(Column, String)> = Vec::new();
let mut right_proj: Vec<(Column, String)> = Vec::new();
let mut seen_right = false;
for (col, alias) in &projection_as_columns {
let origin = &column_indices[col.index()];
match origin.side {
// Keep the "left block before right block" contiguity the current
// pushdown supports; a left column after a right one is "mixed".
JoinSide::Left => {
if seen_right {
return Ok(None);
}
left_proj.push((Column::new(col.name(), origin.index), alias.clone()));
}
JoinSide::Right => {
seen_right = true;
right_proj.push((Column::new(col.name(), origin.index), alias.clone()));
}
// Synthetic column (e.g. mark): belongs to neither child.
// Phase 2 declines; Phase 3 keeps it at the join output instead.
JoinSide::None => return Ok(None),
}
}

if !join_allows_pushdown(
&projection_as_columns,
schema,
far_right_left_col_ind,
far_left_right_col_ind,
) {
// Parity: neither side fully dropped.
if left_proj.is_empty() || right_proj.is_empty() {
return Ok(None);
}

// `left_proj` / `right_proj` carry *child* indices (from `column_indices`),
// so the shared `update_join_*` helpers must use a 0 column-index offset for
// both sides (the offset bridges child -> join-output index, which is the
// identity here).
let new_filter = if let Some(filter) = filter {
match update_join_filter(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
join_left.schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
match update_join_filter(&left_proj, &right_proj, filter, 0) {
Some(updated) => Some(updated),
None => return Ok(None),
}
} else {
None
};

let Some(new_on) = update_join_on(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
join_on,
join_left.schema().fields().len(),
) else {
let Some(new_on) = update_join_on(&left_proj, &right_proj, join_on, 0) else {
return Ok(None);
};

let (new_left, new_right) = new_join_children(
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
join_left,
join_right,
)?;
let (new_left, new_right) =
new_join_children_from_groups(&left_proj, &right_proj, join_left, join_right)?;

Ok(Some(JoinData {
projected_left_child: new_left,
Expand Down Expand Up @@ -880,6 +890,34 @@ pub fn new_join_children(
Ok((new_left, new_right))
}

/// Build the projected left and right children from side-grouped projection
/// columns whose indices are already *child*-relative (e.g. derived from a
/// join's `ColumnIndex`). Unlike [`new_join_children`], this does not infer
/// child ownership from output position, so it is safe for join schemas whose
/// output is not a plain `left ++ right` (used by the schema-aware
/// `try_pushdown_through_join`).
fn new_join_children_from_groups(
left_proj: &[(Column, String)],
right_proj: &[(Column, String)],
left_child: &Arc<dyn ExecutionPlan>,
right_child: &Arc<dyn ExecutionPlan>,
) -> Result<(ProjectionExec, ProjectionExec)> {
let build = |cols: &[(Column, String)], child: &Arc<dyn ExecutionPlan>| {
ProjectionExec::try_new(
cols.iter().map(|(col, alias)| ProjectionExpr {
expr: Arc::new(Column::new(col.name(), col.index())) as _,
alias: alias.clone(),
}),
Arc::clone(child),
)
};

Ok((
build(left_proj, left_child)?,
build(right_proj, right_child)?,
))
}

/// Checks three conditions for pushing a projection down through a join:
/// - Projection must narrow the join output schema.
/// - Columns coming from left/right tables must be collected at the left/right
Expand Down Expand Up @@ -946,14 +984,10 @@ pub fn update_join_on(
.map(|(left, right)| (left, right))
.unzip();

let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
let new_right_columns =
new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);

match (new_left_columns, new_right_columns) {
(Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
_ => None,
}
let new_left = new_columns_for_join_on(&left_idx, proj_left_exprs, 0)?;
let new_right =
new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size)?;
Some(new_left.into_iter().zip(new_right).collect())
}

/// Tries to update the column indices of a [`JoinFilter`] as if the input of
Expand Down
90 changes: 90 additions & 0 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,96 @@ where t1_id > 40 or not exists (select 1 from t2 where t2.t2_int > t1.t1_int)
33
44

##########
# Regression for https://github.com/apache/datafusion/issues/23010:
# a projection that selects / reorders a subset of columns over a mark join.
# Schema-aware projection pushdown (driven by ColumnIndex / JoinSide) must keep
# the synthetic `mark` column (JoinSide::None) at the join output while pushing
# the child columns down. These lock the query results (which must stay stable
# across the refactor) and the current plan shape (the physical plan is expected
# to change once child pushdown is enabled for mark joins). Cover hash LeftMark,
# negated mark, and nested-loop mark.
##########

query TT
EXPLAIN SELECT t1_name, t1_id FROM t1
WHERE t1_id > 40 OR t1_id IN (SELECT t2_id FROM t2 WHERE t1_int > 0)
----
logical_plan
01)Projection: t1.t1_name, t1.t1_id
02)--Filter: t1.t1_id > Int32(40) OR __correlated_sq_1.mark
03)----Projection: t1.t1_id, t1.t1_name, __correlated_sq_1.mark
04)------LeftMark Join: t1.t1_id = __correlated_sq_1.t2_id Filter: t1.t1_int > Int32(0)
05)--------TableScan: t1 projection=[t1_id, t1_name, t1_int]
06)--------SubqueryAlias: __correlated_sq_1
07)----------TableScan: t2 projection=[t2_id]
physical_plan
01)FilterExec: t1_id@0 > 40 OR mark@2, projection=[t1_name@1, t1_id@0]
02)--HashJoinExec: mode=CollectLeft, join_type=RightMark, on=[(t2_id@0, t1_id@0)], filter=t1_int@0 > 0, projection=[t1_id@0, t1_name@1, mark@3]
03)----DataSourceExec: partitions=1, partition_sizes=[2]
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)------DataSourceExec: partitions=1, partition_sizes=[2]

query TI rowsort
SELECT t1_name, t1_id FROM t1
WHERE t1_id > 40 OR t1_id IN (SELECT t2_id FROM t2 WHERE t1_int > 0)
----
a 11
b 22
d 44

query TT
EXPLAIN SELECT t1_int, t1_name FROM t1
WHERE t1_id < 20 OR NOT EXISTS (SELECT 1 FROM t2 WHERE t1.t1_id = t2.t2_id)
----
logical_plan
01)Projection: t1.t1_int, t1.t1_name
02)--Filter: t1.t1_id < Int32(20) OR NOT __correlated_sq_1.mark
03)----LeftMark Join: t1.t1_id = __correlated_sq_1.t2_id
04)------TableScan: t1 projection=[t1_id, t1_name, t1_int]
05)------SubqueryAlias: __correlated_sq_1
06)--------TableScan: t2 projection=[t2_id]
physical_plan
01)FilterExec: t1_id@0 < 20 OR NOT mark@3, projection=[t1_int@2, t1_name@1]
02)--HashJoinExec: mode=CollectLeft, join_type=RightMark, on=[(t2_id@0, t1_id@0)]
03)----DataSourceExec: partitions=1, partition_sizes=[2]
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)------DataSourceExec: partitions=1, partition_sizes=[2]

query IT rowsort
SELECT t1_int, t1_name FROM t1
WHERE t1_id < 20 OR NOT EXISTS (SELECT 1 FROM t2 WHERE t1.t1_id = t2.t2_id)
----
1 a
3 c

query TT
EXPLAIN SELECT t1_name FROM t1
WHERE t1_id > 40 OR EXISTS (SELECT 1 FROM t2 WHERE t1.t1_int > t2.t2_int)
----
logical_plan
01)Projection: t1.t1_name
02)--Filter: t1.t1_id > Int32(40) OR __correlated_sq_1.mark
03)----Projection: t1.t1_id, t1.t1_name, __correlated_sq_1.mark
04)------LeftMark Join: Filter: t1.t1_int > __correlated_sq_1.t2_int
05)--------TableScan: t1 projection=[t1_id, t1_name, t1_int]
06)--------SubqueryAlias: __correlated_sq_1
07)----------TableScan: t2 projection=[t2_int]
physical_plan
01)FilterExec: t1_id@0 > 40 OR mark@2, projection=[t1_name@1]
02)--NestedLoopJoinExec: join_type=RightMark, filter=t1_int@0 > t2_int@1, projection=[t1_id@0, t1_name@1, mark@3]
03)----DataSourceExec: partitions=1, partition_sizes=[2]
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)------DataSourceExec: partitions=1, partition_sizes=[2]

query T rowsort
SELECT t1_name FROM t1
WHERE t1_id > 40 OR EXISTS (SELECT 1 FROM t2 WHERE t1.t1_int > t2.t2_int)
----
b
c
d

statement ok
set datafusion.explain.logical_plan_only = true;

Expand Down
Loading