Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,14 +862,6 @@ impl HashJoinExec {
return false;
}

// Bounds and membership filters derived from the build side do not
// account for null-equal matching: a probe-side NULL key evaluates
// such predicates to NULL and would be pruned, even though it can
// match a build-side NULL when nulls compare equal.
if self.null_equality == NullEquality::NullEqualsNull {
return false;
}

// `preserve_file_partitions` can report Hash partitioning for Hive-style
// file groups, but those partitions are not actually hash-distributed.
// Partitioned dynamic filters rely on hash routing, so disable them in
Expand Down Expand Up @@ -1363,6 +1355,8 @@ impl ExecutionPlan for HashJoinExec {
filter,
on_right,
repartition_random_state,
self.null_equality,
self.null_aware,
))
})))
})
Expand Down Expand Up @@ -6628,7 +6622,7 @@ mod tests {
}

#[test]
fn test_dynamic_filter_pushdown_rejects_null_equal_join() -> Result<()> {
fn test_dynamic_filter_pushdown_allowed_for_null_equal_join() -> Result<()> {
let (_, _, on) = build_schema_and_on()?;
let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1]));
let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1]));
Expand All @@ -6651,7 +6645,9 @@ mod tests {
false,
)?;

assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options()));
// Null-equal joins keep dynamic filter pushdown: the pushed predicate carries an
// `IS NULL` disjunct so a probe-side NULL still reaches the join.
assert!(join.allow_join_dynamic_filter_pushdown(session_config.options()));

Ok(())
}
Expand Down
119 changes: 115 additions & 4 deletions datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ use crate::joins::hash_join::partitioned_hash_eval::{
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult};
use datafusion_common::{
DataFusionError, NullEquality, Result, ScalarValue, SharedResult,
};
use datafusion_expr::Operator;
use datafusion_functions::core::r#struct as struct_func;
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit,
BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, IsNullExpr, lit,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr};

Expand Down Expand Up @@ -255,6 +257,12 @@ pub(crate) struct SharedBuildAccumulator {
repartition_random_state: SeededRandomState,
/// Schema of the probe (right) side for evaluating filter expressions
probe_schema: Arc<Schema>,
/// Null equality of the join. Under `NullEqualsNull` a probe-side NULL can match a
/// build-side NULL, so the pushed filter must keep NULL rows here too.
null_equality: NullEquality,
/// Null-aware anti join (`NOT IN`). A probe-side NULL must reach the join so its
/// three-valued logic can collapse the result, so the pushed filter keeps NULL rows.
null_aware: bool,
}

/// Strategy for filter pushdown (decided at collection time)
Expand Down Expand Up @@ -351,13 +359,16 @@ impl SharedBuildAccumulator {
/// We cannot build a partial filter from some partitions - it would incorrectly eliminate
/// valid join results. We must wait until we have complete information from ALL
/// relevant partitions before updating the dynamic filter.
#[expect(clippy::too_many_arguments)]
pub(crate) fn new_from_partition_mode(
partition_mode: PartitionMode,
left_child: &dyn ExecutionPlan,
right_child: &dyn ExecutionPlan,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
repartition_random_state: SeededRandomState,
null_equality: NullEquality,
null_aware: bool,
) -> Self {
// Troubleshooting: If partition counts are incorrect, verify this logic matches
// the actual execution pattern in collect_build_side()
Expand Down Expand Up @@ -404,6 +415,8 @@ impl SharedBuildAccumulator {
on_right,
repartition_random_state,
probe_schema: right_child.schema(),
null_equality,
null_aware,
}
}

Expand Down Expand Up @@ -579,7 +592,8 @@ impl SharedBuildAccumulator {
if let Some(filter_expr) =
combine_membership_and_bounds(membership_expr, bounds_expr)
{
self.dynamic_filter.update(filter_expr)?;
self.dynamic_filter
.update(self.preserve_probe_nulls(filter_expr))?;
}
}
PartitionStatus::Pending => {
Expand Down Expand Up @@ -685,12 +699,51 @@ impl SharedBuildAccumulator {
)?) as Arc<dyn PhysicalExpr>
};

self.dynamic_filter.update(filter_expr)?;
self.dynamic_filter
.update(self.preserve_probe_nulls(filter_expr))?;
}
}

Ok(())
}

/// Keeps probe rows with a NULL key when the join semantics need them.
///
/// The build-side predicate drops probe rows whose key is NULL. A null-aware anti join
/// (`NOT IN`) needs that NULL to reach the join so three-valued logic can collapse the
/// result, and a null-equal join needs it to match a build-side NULL. OR-ing `key IS NULL`
/// keeps those rows while preserving the filter's selectivity for the rest; the join refines
/// whatever the widened filter lets through.
fn preserve_probe_nulls(
&self,
filter_expr: Arc<dyn PhysicalExpr>,
) -> Arc<dyn PhysicalExpr> {
if self.null_equality != NullEquality::NullEqualsNull && !self.null_aware {
return filter_expr;
}
// Only a key that can actually be NULL needs the disjunct; a NOT NULL key never widens.
// Null-aware joins are single-key; null-equal joins can be multi-key, so OR every nullable
// key. If every key is NOT NULL the filter is left untouched, at full selectivity.
let any_key_is_null = self
.on_right
.iter()
// Widen on unresolved nullability: an extra NULL row is safe, a dropped one isn't.
.filter(|key| key.nullable(&self.probe_schema).unwrap_or(true))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we widen when we are unable to check column nullability ? i.e. unwrap_or(true).
From what I see this can only happen when on_right and schema are out of sync which seems to be an invalid state ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a should-never-happen (as you said: keys out of sync with the probe schema), so I kept unwrap_or(true) as the safe degradation: over-widening only loses a little selectivity, while false could drop a NULL the join needs. Documented it in 9620b97.

@RatulDawar RatulDawar Jun 25, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking how an invalid state if achieved somehow should be handled, instead of silently handling it shouldn't we propagate the error further.
The fail safe check was added here #3238
Though I am not sure what's the consensus for things like these, so a commiter's input would be helpful here.

.map(|key| {
Arc::new(IsNullExpr::new(Arc::clone(key))) as Arc<dyn PhysicalExpr>
})
.reduce(|acc, is_null| {
Arc::new(BinaryExpr::new(acc, Operator::Or, is_null))
as Arc<dyn PhysicalExpr>
});
// Cheap null check first short-circuits before the costlier dynamic filter.
match any_key_is_null {
Some(any_key_is_null) => {
Arc::new(BinaryExpr::new(any_key_is_null, Operator::Or, filter_expr))
}
None => filter_expr,
}
}
}

impl fmt::Debug for SharedBuildAccumulator {
Expand Down Expand Up @@ -722,6 +775,8 @@ pub(super) fn make_partitioned_accumulator_for_test(
on_right: vec![],
repartition_random_state: SeededRandomState::with_seed(1),
probe_schema,
null_equality: NullEquality::NullEqualsNothing,
null_aware: false,
}
}

Expand All @@ -741,6 +796,7 @@ pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) -> usi
#[cfg(test)]
mod tests {
use super::*;
use datafusion_physical_expr::expressions::Column;

fn partitioned_state(acc: &SharedBuildAccumulator) -> (Vec<PartitionStatus>, usize) {
let guard = acc.inner.lock();
Expand Down Expand Up @@ -810,4 +866,59 @@ mod tests {
assert!(matches!(partitions[0], PartitionStatus::CanceledUnknown));
assert_eq!(completed, 1);
}

fn null_equal_accumulator(
probe_schema: Arc<Schema>,
on_right: Vec<PhysicalExprRef>,
) -> SharedBuildAccumulator {
SharedBuildAccumulator {
inner: Mutex::new(AccumulatorState {
data: AccumulatedBuildData::Partitioned {
partitions: vec![PartitionStatus::Pending; 1],
completed_partitions: 0,
},
completion: CompletionState::Pending,
}),
completion_notify: Notify::new(),
dynamic_filter: Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))),
on_right,
repartition_random_state: SeededRandomState::with_seed(1),
probe_schema,
null_equality: NullEquality::NullEqualsNull,
null_aware: false,
}
}

#[test]
fn preserve_probe_nulls_only_widens_nullable_keys() {
let probe_schema = Arc::new(Schema::new(vec![
Field::new("k_nullable", DataType::Int32, true),
Field::new("k_not_null", DataType::Int32, false),
]));
let on_right: Vec<PhysicalExprRef> = vec![
Arc::new(Column::new("k_nullable", 0)),
Arc::new(Column::new("k_not_null", 1)),
];
let acc = null_equal_accumulator(probe_schema, on_right);

// Only the nullable key earns an IS NULL disjunct; the NOT NULL key is left out.
let widened = acc.preserve_probe_nulls(lit(true));
assert_eq!(format!("{widened}").matches("IS NULL").count(), 1);
}

#[test]
fn preserve_probe_nulls_leaves_all_not_null_keys_untouched() {
let probe_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let on_right: Vec<PhysicalExprRef> =
vec![Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1))];
let acc = null_equal_accumulator(probe_schema, on_right);

// Every key is NOT NULL, so there is nothing to OR in and the filter is returned as-is.
let filter = lit(true);
let result = acc.preserve_probe_nulls(Arc::clone(&filter));
assert_eq!(format!("{result}"), format!("{filter}"));
}
}
65 changes: 65 additions & 0 deletions datafusion/sqllogictest/test_files/null_aware_anti_join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,68 @@ DROP TABLE customers_test;

statement ok
DROP TABLE all_null_banned;

#############
## Test: dynamic filter pushdown must not drop inner (probe-side) NULLs.
## With join dynamic filter pushdown on, the build-side filter pushed to the probe scan would drop
## inner NULLs, but NOT IN three-valued logic needs them to collapse the result to zero rows. The
## in-memory VALUES scans above never apply the pushed filter, so this case needs a parquet scan.
#############

statement ok
set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true;

# Row-level parquet filtering, so the pushed filter actually drops matching rows instead of only
# pruning row groups. Without this the single row group is read whole and the NULL never gets dropped.
statement ok
set datafusion.execution.parquet.pushdown_filters = true;

statement ok
CREATE TABLE asa_outer(id INT) AS VALUES (1), (2), (3);

statement ok
CREATE TABLE asa_inner(eid INT) AS VALUES (2), (NULL);

query I
COPY asa_outer TO 'test_files/scratch/null_aware_anti_join/asa_outer.parquet' STORED AS PARQUET;
----
3

query I
COPY asa_inner TO 'test_files/scratch/null_aware_anti_join/asa_inner.parquet' STORED AS PARQUET;
----
2

statement ok
CREATE EXTERNAL TABLE asa_outer_parquet(id INT)
STORED AS PARQUET
LOCATION 'test_files/scratch/null_aware_anti_join/asa_outer.parquet';

statement ok
CREATE EXTERNAL TABLE asa_inner_parquet(eid INT)
STORED AS PARQUET
LOCATION 'test_files/scratch/null_aware_anti_join/asa_inner.parquet';

# Expected: zero rows. Before the fix the pushed dynamic filter dropped inner NULLs, so the join
# wrongly returned id = 1 and id = 3.
query I
SELECT id FROM asa_outer_parquet WHERE id NOT IN (SELECT eid FROM asa_inner_parquet) ORDER BY id;
----

statement ok
DROP TABLE asa_outer;

statement ok
DROP TABLE asa_inner;

statement ok
DROP TABLE asa_outer_parquet;

statement ok
DROP TABLE asa_inner_parquet;

statement ok
RESET datafusion.execution.parquet.pushdown_filters;

statement ok
RESET datafusion.optimizer.enable_join_dynamic_filter_pushdown;
39 changes: 33 additions & 6 deletions datafusion/sqllogictest/test_files/push_down_filter_parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1024,10 +1024,10 @@ drop table int_probe;


########
# Dynamic filters must not be created for null-equal joins (IS NOT DISTINCT
# FROM, INTERSECT): min/max bounds and membership filters derived from the
# build side evaluate to NULL for probe-side NULL keys and would prune rows
# that can null-match a build-side NULL.
# Null-equal joins (IS NOT DISTINCT FROM, INTERSECT) keep dynamic filter pushdown.
# Min/max bounds and membership filters derived from the build side evaluate to NULL
# for a probe-side NULL key, so the pushed predicate carries an `IS NULL` disjunct that
# lets the probe NULL reach the join and null-match a build-side NULL.
########

statement ok
Expand All @@ -1049,14 +1049,14 @@ SELECT nej_build.id, nej_probe.id FROM nej_build JOIN nej_probe ON nej_build.id
11 11
NULL NULL

# No DynamicFilter predicate may appear on the probe side of a null-equal join
# The probe side now carries a DynamicFilter for a null-equal join (widened with IS NULL at runtime)
query TT
EXPLAIN SELECT nej_build.id, nej_probe.id FROM nej_build JOIN nej_probe ON nej_build.id IS NOT DISTINCT FROM nej_probe.id
----
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], NullsEqual: true
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_build.parquet]]}, projection=[id], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_probe.parquet]]}, projection=[id], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_probe.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible

statement ok
drop table nej_build;
Expand All @@ -1065,6 +1065,33 @@ statement ok
drop table nej_probe;


# Multi-key null-equal join: the IS NULL disjunct covers every nullable key, so a probe row with a
# NULL in either key still reaches the join and null-matches the build side.
statement ok
COPY (SELECT * FROM (VALUES (1, 10), (2, NULL), (NULL, 30)) v(a, b)) TO 'test_files/scratch/push_down_filter_parquet/mnej_probe.parquet' STORED AS PARQUET;

statement ok
COPY (SELECT * FROM (VALUES (1, 10), (2, NULL)) v(a, b)) TO 'test_files/scratch/push_down_filter_parquet/mnej_build.parquet' STORED AS PARQUET;

statement ok
CREATE EXTERNAL TABLE mnej_probe STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_parquet/mnej_probe.parquet';

statement ok
CREATE EXTERNAL TABLE mnej_build STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_parquet/mnej_build.parquet';

query IIII rowsort
SELECT mnej_build.a, mnej_build.b, mnej_probe.a, mnej_probe.b FROM mnej_build JOIN mnej_probe ON (mnej_build.a IS NOT DISTINCT FROM mnej_probe.a) AND (mnej_build.b IS NOT DISTINCT FROM mnej_probe.b)
----
1 10 1 10
2 NULL 2 NULL

statement ok
drop table mnej_build;

statement ok
drop table mnej_probe;


# Config reset
statement ok
RESET datafusion.explain.physical_plan_only;
Expand Down
Loading