Skip to content

refactor(hash-aggr): Migrate ordered partial/final aggregation#23181

Open
2010YOUY01 wants to merge 1 commit into
apache:mainfrom
2010YOUY01:split-aggr-ordered
Open

refactor(hash-aggr): Migrate ordered partial/final aggregation#23181
2010YOUY01 wants to merge 1 commit into
apache:mainfrom
2010YOUY01:split-aggr-ordered

Conversation

@2010YOUY01

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

  • Closes #.

Rationale for this change

Part of #22710

This PRs implements the cases that input is ordered by group keys.

Comments at datafusion/physical-plan/src/aggregates/ordered_partial_stream.rs explains the high-level idea.

What changes are included in this PR?

  1. Implement two aggregate tables for ordered partial/final aggregates. It provides a simple abstraction to the control flow: its logically a map from group keys to group states, and internally handles the low-level details
  2. Implement two streams for ordered partial/final aggregates

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions Bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Jun 25, 2026
assert!(collected_running.len() > 2);
// Running should produce more chunk than the usual AggregateExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asserting: we run the same query on OrderedAggregateStream and AggregateStream, the first one should return more number of batches.

This is implementation dependent, and later it will compare the whole result row-by-row, so it's safe to delete

/// `k = 100`, it is safe to emit all groups with keys less than 100 because the
/// input is ordered.
///
/// ## Implementation Note

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's obvious there are many applicable optimizations for this path, here is the explanation why this PR tends to keep it simple.

}

#[tokio::test]
async fn ordered_partial_aggregate_partially_sorted_no_emit_panic() -> Result<()> {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is migrated from row_hash.rs's existing UT, and there is a comment left at the original test, so we can check it easier when deleting the old implementation eventually.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant