Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,7 @@ impl ExecutionPlan for HashJoinExec {
filter,
on_right,
repartition_random_state,
self.null_aware,
))
})))
})
Expand Down
41 changes: 38 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult};
use datafusion_expr::Operator;
use datafusion_functions::core::r#struct as struct_func;
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit,
BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, IsNullExpr, lit,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr};

Expand Down Expand Up @@ -255,6 +255,9 @@ pub(crate) struct SharedBuildAccumulator {
repartition_random_state: SeededRandomState,
/// Schema of the probe (right) side for evaluating filter expressions
probe_schema: Arc<Schema>,
/// Null-aware anti join (`NOT IN`). A probe-side NULL must reach the join so its
/// three-valued logic can collapse the result, so the pushed filter keeps NULL rows.
null_aware: bool,
}

/// Strategy for filter pushdown (decided at collection time)
Expand Down Expand Up @@ -358,6 +361,7 @@ impl SharedBuildAccumulator {
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
repartition_random_state: SeededRandomState,
null_aware: bool,
) -> Self {
// Troubleshooting: If partition counts are incorrect, verify this logic matches
// the actual execution pattern in collect_build_side()
Expand Down Expand Up @@ -404,6 +408,7 @@ impl SharedBuildAccumulator {
on_right,
repartition_random_state,
probe_schema: right_child.schema(),
null_aware,
}
}

Expand Down Expand Up @@ -579,7 +584,8 @@ impl SharedBuildAccumulator {
if let Some(filter_expr) =
combine_membership_and_bounds(membership_expr, bounds_expr)
{
self.dynamic_filter.update(filter_expr)?;
self.dynamic_filter
.update(self.null_aware_filter(filter_expr))?;
}
}
PartitionStatus::Pending => {
Expand Down Expand Up @@ -685,12 +691,40 @@ impl SharedBuildAccumulator {
)?) as Arc<dyn PhysicalExpr>
};

self.dynamic_filter.update(filter_expr)?;
self.dynamic_filter
.update(self.null_aware_filter(filter_expr))?;
}
}

Ok(())
}

/// Wraps a pushdown filter so a null-aware anti join keeps its probe-side NULL rows.
///
/// The build-side predicate drops probe rows whose key is NULL, but `NOT IN` three-valued
/// logic needs that NULL to reach the join. OR-ing `probe_key IS NULL` preserves the dynamic
/// filter's selectivity for non-NULL rows while letting the NULL through.
fn null_aware_filter(
&self,
filter_expr: Arc<dyn PhysicalExpr>,
) -> Arc<dyn PhysicalExpr> {
if !self.null_aware {
return filter_expr;
}
debug_assert_eq!(
self.on_right.len(),
1,
"null_aware anti join must have exactly one probe key"
);
let probe_key_is_null: Arc<dyn PhysicalExpr> =
Arc::new(IsNullExpr::new(Arc::clone(&self.on_right[0])));
// Cheap null check first short-circuits before the costlier dynamic filter.
Arc::new(BinaryExpr::new(
probe_key_is_null,
Operator::Or,
filter_expr,
))
}
}

impl fmt::Debug for SharedBuildAccumulator {
Expand Down Expand Up @@ -722,6 +756,7 @@ pub(super) fn make_partitioned_accumulator_for_test(
on_right: vec![],
repartition_random_state: SeededRandomState::with_seed(1),
probe_schema,
null_aware: false,
}
}

Expand Down
65 changes: 65 additions & 0 deletions datafusion/sqllogictest/test_files/null_aware_anti_join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,68 @@ DROP TABLE customers_test;

statement ok
DROP TABLE all_null_banned;

#############
## Test: dynamic filter pushdown must not drop inner (probe-side) NULLs.
## With join dynamic filter pushdown on, the build-side filter pushed to the probe scan would drop
## inner NULLs, but NOT IN three-valued logic needs them to collapse the result to zero rows. The
## in-memory VALUES scans above never apply the pushed filter, so this case needs a parquet scan.
#############

statement ok
set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true;

# Row-level parquet filtering, so the pushed filter actually drops matching rows instead of only
# pruning row groups. Without this the single row group is read whole and the NULL never gets dropped.
statement ok
set datafusion.execution.parquet.pushdown_filters = true;

statement ok
CREATE TABLE asa_outer(id INT) AS VALUES (1), (2), (3);

statement ok
CREATE TABLE asa_inner(eid INT) AS VALUES (2), (NULL);

query I
COPY asa_outer TO 'test_files/scratch/null_aware_anti_join/asa_outer.parquet' STORED AS PARQUET;
----
3

query I
COPY asa_inner TO 'test_files/scratch/null_aware_anti_join/asa_inner.parquet' STORED AS PARQUET;
----
2

statement ok
CREATE EXTERNAL TABLE asa_outer_parquet(id INT)
STORED AS PARQUET
LOCATION 'test_files/scratch/null_aware_anti_join/asa_outer.parquet';

statement ok
CREATE EXTERNAL TABLE asa_inner_parquet(eid INT)
STORED AS PARQUET
LOCATION 'test_files/scratch/null_aware_anti_join/asa_inner.parquet';

# Expected: zero rows. Before the fix the pushed dynamic filter dropped inner NULLs, so the join
# wrongly returned id = 1 and id = 3.
query I
SELECT id FROM asa_outer_parquet WHERE id NOT IN (SELECT eid FROM asa_inner_parquet) ORDER BY id;
----

statement ok
DROP TABLE asa_outer;

statement ok
DROP TABLE asa_inner;

statement ok
DROP TABLE asa_outer_parquet;

statement ok
DROP TABLE asa_inner_parquet;

statement ok
RESET datafusion.execution.parquet.pushdown_filters;

statement ok
RESET datafusion.optimizer.enable_join_dynamic_filter_pushdown;
Loading