Related:
Is your feature request related to a problem or challenge?
Grouped aggregation can require input partitioning by the group-by expressions so that all rows for the same group are processed in the same partition. Today, aggregate modes that need partitioned input require HashPartitioned group-by expressions.
Range partitioning can also provide the needed grouping property. If the input is range partitioned by compatible group-by expressions, so equal group keys are colocated in the same partition. In those cases, requiring hash partitioning can introduce an unnecessary repartition even when the input is already partitioned correctly.
Describe the solution you'd like
Allow compatible Partitioning::Range inputs to satisfy grouped aggregation distribution requirements where the aggregate only needs key colocation, not hash partitioning specifically.
- Reviewing aggregate modes that currently require
HashPartitioned input, such as FinalPartitioned and SinglePartitioned.
- Determining whether those modes can request key-based partitioning rather than hash-specific partitioning.
- Ensuring range partitioning is accepted only when the range ordering expressions satisfy the group-by expressions.
- Preserving conservative fallback to hash repartitioning when the range partitioning is incompatible.
Examples that should satisfy the aggregation:
AggregateExec: mode=FinalPartitioned, groupBy=[a]
input: Partitioning::Range(ordering=[a ASC], split_points=[...])
Rows with the same a value are colocated in one range partition, so the final partitioned aggregate should not need an additional hash repartition.
AggregateExec: mode=FinalPartitioned, groupBy=[a, b]
input: Partitioning::Range(ordering=[a ASC], split_points=[...])
Rows with the same (a, b) group key are colocated because they also have the same a value. Range partitioning by a subset of the group-by keys can be sufficient for key colocation when that subset cannot split equal groups across partitions.
Examples that should not satisfy the aggregation:
AggregateExec: mode=FinalPartitioned, groupBy=[b]
input: Partitioning::Range(ordering=[a ASC], split_points=[...])
The input is partitioned by a, but the aggregate groups by b, so equal b values may appear in multiple partitions.
AggregateExec: mode=FinalPartitioned, groupBy=[a]
input: Partitioning::Range(ordering=[a ASC, b ASC], split_points=[...])
The input range partitioning can split rows with the same a value across different b ranges, so rows for one a group may appear in multiple partitions.
AggregateExec: mode=FinalPartitioned, groupBy=[a]
input: Partitioning::Range(ordering=[a DESC], split_points=[...])
This should only satisfy the aggregate if the range compatibility logic treats the ordering and split point semantics as compatible for key colocation. If the ordering options cannot be proven compatible, DataFusion should repartition.
Describe alternatives you've considered
Another alternative is to wait until physical range repartition execution is implemented, but this issue is about consuming already range-partitioned inputs and does not require inserting a new range repartition.
Additional context
This is similar in spirit to #23184 for joins, but aggregation is a single-input operator. The goal is to let aggregations express the distribution property they actually need.
Relevant APIs and code to review:
AggregateExec::required_input_distribution, which currently requests HashPartitioned for partitioned aggregate modes.
AggregateMode::{FinalPartitioned, SinglePartitioned} and any other modes that require grouped input partitioning.
Distribution::{HashPartitioned, KeyPartitioned} for expressing whether the operator needs hash partitioning specifically or only key colocation.
Partitioning::satisfaction and Partitioning::Range compatibility logic.
EnforceDistribution, which decides whether to insert RepartitionExec.
Related:
Is your feature request related to a problem or challenge?
Grouped aggregation can require input partitioning by the group-by expressions so that all rows for the same group are processed in the same partition. Today, aggregate modes that need partitioned input require
HashPartitionedgroup-by expressions.Range partitioning can also provide the needed grouping property. If the input is range partitioned by compatible group-by expressions, so equal group keys are colocated in the same partition. In those cases, requiring hash partitioning can introduce an unnecessary repartition even when the input is already partitioned correctly.
Describe the solution you'd like
Allow compatible
Partitioning::Rangeinputs to satisfy grouped aggregation distribution requirements where the aggregate only needs key colocation, not hash partitioning specifically.HashPartitionedinput, such asFinalPartitionedandSinglePartitioned.Examples that should satisfy the aggregation:
Rows with the same
avalue are colocated in one range partition, so the final partitioned aggregate should not need an additional hash repartition.Rows with the same
(a, b)group key are colocated because they also have the sameavalue. Range partitioning by a subset of the group-by keys can be sufficient for key colocation when that subset cannot split equal groups across partitions.Examples that should not satisfy the aggregation:
The input is partitioned by
a, but the aggregate groups byb, so equalbvalues may appear in multiple partitions.The input range partitioning can split rows with the same
avalue across differentbranges, so rows for oneagroup may appear in multiple partitions.This should only satisfy the aggregate if the range compatibility logic treats the ordering and split point semantics as compatible for key colocation. If the ordering options cannot be proven compatible, DataFusion should repartition.
Describe alternatives you've considered
Another alternative is to wait until physical range repartition execution is implemented, but this issue is about consuming already range-partitioned inputs and does not require inserting a new range repartition.
Additional context
This is similar in spirit to #23184 for joins, but aggregation is a single-input operator. The goal is to let aggregations express the distribution property they actually need.
Relevant APIs and code to review:
AggregateExec::required_input_distribution, which currently requestsHashPartitionedfor partitioned aggregate modes.AggregateMode::{FinalPartitioned, SinglePartitioned}and any other modes that require grouped input partitioning.Distribution::{HashPartitioned, KeyPartitioned}for expressing whether the operator needs hash partitioning specifically or only key colocation.Partitioning::satisfactionandPartitioning::Rangecompatibility logic.EnforceDistribution, which decides whether to insertRepartitionExec.