Skip to content

Support co-partitioned range inner equi joins#23184

Open
gene-bordegaray wants to merge 2 commits into
apache:mainfrom
gene-bordegaray:gene.bordegaray/2026/06/range-partitioned-joins
Open

Support co-partitioned range inner equi joins#23184
gene-bordegaray wants to merge 2 commits into
apache:mainfrom
gene-bordegaray:gene.bordegaray/2026/06/range-partitioned-joins

Conversation

@gene-bordegaray

@gene-bordegaray gene-bordegaray commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

DataFusion can represent source-declared range partitioning, but partitioned hash joins still required hash partitioned inputs. So an inner join on compatible range-partitioned keys would insert unnecessary hash repartitions, even when each left/right partition already covered the same key domain.

This PR adds a partitioning requirement that means "equal key values are co-located" . I was calling this "compatibility" but found we can satisfy the requirement with looser conditions. Other systems call this "co-location" or "co-partitioning" (trino, spark). Which they (and now I am proposing) define as when both sides of a join are already partitioned so matching key values appear in corresponding partitions, so we can join partition pairs directly without repartitioning the sides.

This lets "co-partitioned" range inputs satisfy inner partitioned hash joins. This will also be applicable to other join types and operators but kept the first PR thin to keep scope more reviewable.

What changes are included in this PR?

  • Adds Distribution::KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>) as a public distribution requirement.

    • HashPartitioned([a]) means rows must be partitioned by hash on a.
    • KeyPartitioned([a]) means rows with equal a values must be co-located, but the partitioning algorithm may be hash, range, or another compatible scheme.
    • Example:
      Hash([left.a], 3) satisfies KeyPartitioned([left.a])
      Range([right.b ASC], [(10), (20)], 3) satisfies KeyPartitioned([right.b])
      
  • Adds Partitioning::co_partitioned_with(...) to validate that two independently satisfying partitionings also can be paired by partition index.

    • Examples:
      • Accepted: both sides satisfy their own key requirement and have matching range boundaries.
        left:  Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
        right: Range([b ASC], [(10), (20)], 3), required KeyPartitioned([b])
        
      • Accepted: both sides satisfy their own key requirement and have matching hash partition counts.
        left:  Hash([a], 3), required KeyPartitioned([a])
        right: Hash([b], 3), required KeyPartitioned([b])
        
      • Rejected: both sides satisfy their own key requirement, but range boundaries differ.
        left:  Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
        right: Range([b ASC], [(15), (20)], 3), required KeyPartitioned([b])
        
      • Rejected: both sides satisfy their own key requirement, but partition counts differ.
        left:  Hash([a], 3), required KeyPartitioned([a])
        right: Hash([b], 4), required KeyPartitioned([b])
        
  • Changes inner partitioned HashJoinExec requirements from HashPartitioned to KeyPartitioned.

    • All other hash joins still require HashPartitioned for now.
  • Updates EnforceDistribution so co-partitioned range inner joins avoid repartitioning.

    • Examples:
      • Compatible range partitioning: no repartition is inserted because partitions can be joined by index.
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
          DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
          DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
        
      • Incompatible range boundaries: both sides are repartitioned by hash because partition i does not represent the same key domain on both sides.
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
          RepartitionExec: partitioning=Hash([a], target_partitions)
            DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
          RepartitionExec: partitioning=Hash([b], target_partitions)
            DataSourceExec: output_partitioning=Range([b ASC], [(15), (20)], 3)
        
      • Mismatched hash partition counts: both sides are forced to the target hash partition count so partition indexes line up.
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
          RepartitionExec: partitioning=Hash([a], target_partitions)
            DataSourceExec: output_partitioning=Hash([a], 11)
          RepartitionExec: partitioning=Hash([b], target_partitions)
            DataSourceExec: output_partitioning=Hash([b], 12)
        
      • Non-inner joins: range inputs still get hash repartitioning because only inner partitioned hash joins use KeyPartitioned in this PR.
        HashJoinExec: mode=Partitioned, join_type=Left, on=[(a, b)]
          RepartitionExec: partitioning=Hash([a], target_partitions)
            DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
          RepartitionExec: partitioning=Hash([b], target_partitions)
            DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
        
  • Keeps partitioned dynamic filter pushdown restricted to hash-compatible routing.

    • Compatible range partitioning can satisfy the join, but dynamic filters still route by hash, so range/range partitioned joins disable dynamic filters.
  • Degrades range join output partitioning to UnknownPartitioning(n) rather than erroring. Adding this behavior would need more tests and careful thought about, I think its safert o just degrade for first PR.

Are these changes tested?

Yes.

  • KeyPartitioned satisfaction for hash and range partitioning.
  • co_partitioned_with for compatible and incompatible range/hash partitioning.
  • EnforceDistribution behavior for:
    • compatible range joins avoiding hash repartitioning
    • incompatible range bounds rehashing
    • mismatched hash partition counts rehashing
    • non-inner range joins rehashing
  • sanity checking for invalid partitioned hash joins.
  • dynamic filter rejection for range partitioning, preserved file partitions, and mismatched hash counts.
  • sqllogictest coverage for range-partitioned joins avoiding hash repartitioning and non-range joins still repartitioning.

Are there any user-facing changes?

Yes.

This PR changes public physical planning APIs:

  • Adds Distribution::KeyPartitioned.
  • Adds Partitioning::co_partitioned_with.
    • NOTE: This replaces the previous partition compatibility API with the new co-partitioning API. Since the compatibility API was never in a release I believe this is ok to do (lesson learned to not make API change until ew have definitive consumer).
  • Affects users matching exhaustively on Distribution.

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Jun 25, 2026
@gene-bordegaray gene-bordegaray changed the title Support co-partitioned range hash joins [WIP] Support co-partitioned range hash joins Jun 25, 2026
@github-actions

github-actions Bot commented Jun 25, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion v54.0.0 (current)
       Built [ 110.879s] (current)
     Parsing datafusion v54.0.0 (current)
      Parsed [   0.034s] (current)
    Building datafusion v54.0.0 (baseline)
       Built [ 111.684s] (baseline)
     Parsing datafusion v54.0.0 (baseline)
      Parsed [   0.036s] (baseline)
    Checking datafusion v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.613s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 224.715s] datafusion
    Building datafusion-physical-expr v54.0.0 (current)
       Built [  31.088s] (current)
     Parsing datafusion-physical-expr v54.0.0 (current)
      Parsed [   0.049s] (current)
    Building datafusion-physical-expr v54.0.0 (baseline)
       Built [  29.353s] (baseline)
     Parsing datafusion-physical-expr v54.0.0 (baseline)
      Parsed [   0.050s] (baseline)
    Checking datafusion-physical-expr v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.351s] 223 checks: 221 pass, 2 fail, 0 warn, 30 skip

--- failure enum_variant_added: enum variant added on exhaustive enum ---

Description:
A publicly-visible enum without #[non_exhaustive] has a new variant.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#enum-variant-new
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/enum_variant_added.ron

Failed in:
  variant Distribution:KeyPartitioned in /home/runner/work/datafusion/datafusion/datafusion/physical-expr/src/partitioning.rs:657

--- failure inherent_method_missing: pub method removed or renamed ---

Description:
A publicly-visible method or associated fn is no longer available under its prior name. It may have been renamed or removed entirely.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#item-remove
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/inherent_method_missing.ron

Failed in:
  RangePartitioning::compatible_with, previously in file /home/runner/work/datafusion/datafusion/target/semver-checks/git-apache_main/0d246001df27742061abe27be3b48b639f0f1259/datafusion/physical-expr/src/partitioning.rs:253
  Partitioning::compatible_with, previously in file /home/runner/work/datafusion/datafusion/target/semver-checks/git-apache_main/0d246001df27742061abe27be3b48b639f0f1259/datafusion/physical-expr/src/partitioning.rs:416

     Summary semver requires new major version: 2 major and 0 minor checks failed
    Finished [  61.851s] datafusion-physical-expr
    Building datafusion-physical-optimizer v54.0.0 (current)
       Built [  38.555s] (current)
     Parsing datafusion-physical-optimizer v54.0.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-physical-optimizer v54.0.0 (baseline)
       Built [  38.544s] (baseline)
     Parsing datafusion-physical-optimizer v54.0.0 (baseline)
      Parsed [   0.023s] (baseline)
    Checking datafusion-physical-optimizer v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.125s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  78.268s] datafusion-physical-optimizer
    Building datafusion-physical-plan v54.0.0 (current)
       Built [  36.064s] (current)
     Parsing datafusion-physical-plan v54.0.0 (current)
      Parsed [   0.130s] (current)
    Building datafusion-physical-plan v54.0.0 (baseline)
       Built [  36.732s] (baseline)
     Parsing datafusion-physical-plan v54.0.0 (baseline)
      Parsed [   0.137s] (baseline)
    Checking datafusion-physical-plan v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.646s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  74.774s] datafusion-physical-plan
    Building datafusion-sqllogictest v54.0.0 (current)
       Built [ 178.529s] (current)
     Parsing datafusion-sqllogictest v54.0.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-sqllogictest v54.0.0 (baseline)
       Built [ 179.500s] (baseline)
     Parsing datafusion-sqllogictest v54.0.0 (baseline)
      Parsed [   0.022s] (baseline)
    Checking datafusion-sqllogictest v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.088s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 361.863s] datafusion-sqllogictest

@github-actions github-actions Bot added the auto detected api change Auto detected API change label Jun 25, 2026
/// This is optimizer policy: partitioned joins require children that can be
/// paired by partition index. Inner hash joins can reuse compatible range
/// partitioning; otherwise the existing hash repartitioning policy applies.
fn partitioned_join_distribution(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that simple checking logic is slightly duplicated here, here, and here

There may be a good way to extract this out but didnt want to premptively do a public change on speculation but something I am noting. Let me know if any one has suggestsion

/// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
/// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
/// ```
pub fn co_partitioned_with(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized the concept was a bit off. I think this is ok if we haven't had a release... 😅

I linked the mirroring concepts that are used in trino in spark in the PR description

Lesson learned to have consumer of the public API first

@gene-bordegaray

Copy link
Copy Markdown
Contributor Author

cc: @gabotechs @stuhood

@gene-bordegaray gene-bordegaray changed the title [WIP] Support co-partitioned range hash joins Support co-partitioned range hash joins Jun 25, 2026
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/06/range-partitioned-joins branch from 8583303 to 45d598b Compare June 25, 2026 12:59
@gene-bordegaray gene-bordegaray marked this pull request as ready for review June 25, 2026 13:04
# TEST 3b: Non-Range Join Repartitions
# The source Range partitioning does not satisfy a join on non_range_key, so
# planning still inserts Hash repartitioning.
##########

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can propbably eliminate this, almost a sanity check / nice to see after teh positive case

@stuhood

stuhood commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Amazing timing. Will get you some feedback on this by early next week! Thank you!

@gene-bordegaray

gene-bordegaray commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Amazing timing. Will get you some feedback on this by early next week!

@stuhood great, thank you! I also have a huge line of follow up issues to support more join types that we can split up 👍

I am trying to make smaller tickets as more of the plumbing gets in to get more people involved

@gene-bordegaray gene-bordegaray changed the title Support co-partitioned range hash joins Support co-partitioned range inner equi joins Jun 25, 2026
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/06/range-partitioned-joins branch from 45d598b to 2fca2bb Compare June 26, 2026 15:21
@gene-bordegaray

gene-bordegaray commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

I have added a stacked PR that shows how I imagine Aggregations will consume the KeyPartitioned API: gene-bordegaray#6 (not done yet 😄)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change core Core DataFusion crate optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow co-partitioned range inputs to satisfy inner partitioned hash joins

2 participants