Skip to content

Commit b9ef8de

Browse files
authored
DataFrame supports window function (apache#1167)
1 parent b20846e commit b9ef8de

File tree

3 files changed

+65
-32
lines changed

3 files changed

+65
-32
lines changed

datafusion/src/execution/dataframe_impl.rs

+38-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::arrow::util::pretty;
3535
use crate::physical_plan::{
3636
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
3737
};
38+
use crate::sql::utils::find_window_exprs;
3839
use async_trait::async_trait;
3940

4041
/// Implementation of DataFrame API
@@ -75,10 +76,17 @@ impl DataFrame for DataFrameImpl {
7576

7677
/// Create a projection based on arbitrary expressions
7778
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
78-
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
79-
.project(expr_list)?
80-
.build()?;
81-
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
79+
let window_func_exprs = find_window_exprs(&expr_list);
80+
let plan = if window_func_exprs.is_empty() {
81+
self.to_logical_plan()
82+
} else {
83+
LogicalPlanBuilder::window_plan(self.to_logical_plan(), window_func_exprs)?
84+
};
85+
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
86+
Ok(Arc::new(DataFrameImpl::new(
87+
self.ctx_state.clone(),
88+
&project_plan,
89+
)))
8290
}
8391

8492
/// Create a filter based on a predicate expression
@@ -233,7 +241,7 @@ mod tests {
233241
use crate::execution::options::CsvReadOptions;
234242
use crate::logical_plan::*;
235243
use crate::physical_plan::functions::Volatility;
236-
use crate::physical_plan::ColumnarValue;
244+
use crate::physical_plan::{window_functions, ColumnarValue};
237245
use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext};
238246
use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
239247
use arrow::datatypes::DataType;
@@ -270,6 +278,31 @@ mod tests {
270278
Ok(())
271279
}
272280

281+
#[tokio::test]
282+
async fn select_with_window_exprs() -> Result<()> {
283+
// build plan using Table API
284+
let t = test_table().await?;
285+
let first_row = Expr::WindowFunction {
286+
fun: window_functions::WindowFunction::BuiltInWindowFunction(
287+
window_functions::BuiltInWindowFunction::FirstValue,
288+
),
289+
args: vec![col("aggregate_test_100.c1")],
290+
partition_by: vec![col("aggregate_test_100.c2")],
291+
order_by: vec![],
292+
window_frame: None,
293+
};
294+
let t2 = t.select(vec![col("c1"), first_row])?;
295+
let plan = t2.to_logical_plan();
296+
297+
let sql_plan = create_plan(
298+
"select c1, first_value(c1) over (partition by c2) from aggregate_test_100",
299+
)
300+
.await?;
301+
302+
assert_same_plan(&plan, &sql_plan);
303+
Ok(())
304+
}
305+
273306
#[tokio::test]
274307
async fn aggregate() -> Result<()> {
275308
// build plan using DataFrame API

datafusion/src/logical_plan/builder.rs

+24-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::logical_plan::{
4444
columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
4545
DFSchemaRef, Partitioning,
4646
};
47+
use crate::sql::utils::group_window_expr_by_sort_keys;
4748

4849
/// Default table name for unnamed table
4950
pub const UNNAMED_TABLE: &str = "?table?";
@@ -401,7 +402,29 @@ impl LogicalPlanBuilder {
401402

402403
Ok(Self::from(table_scan))
403404
}
404-
405+
/// Wrap a plan in a window
406+
pub(crate) fn window_plan(
407+
input: LogicalPlan,
408+
window_exprs: Vec<Expr>,
409+
) -> Result<LogicalPlan> {
410+
let mut plan = input;
411+
let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
412+
// sort by sort_key len descending, so that more deeply sorted plans gets nested further
413+
// down as children; to further mimic the behavior of PostgreSQL, we want stable sort
414+
// and a reverse so that tieing sort keys are reversed in order; note that by this rule
415+
// if there's an empty over, it'll be at the top level
416+
groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
417+
groups.reverse();
418+
for (_, exprs) in groups {
419+
let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
420+
// the partition and sort itself is done at physical level, see physical_planner's
421+
// fn create_initial_plan
422+
plan = LogicalPlanBuilder::from(plan)
423+
.window(window_exprs)?
424+
.build()?;
425+
}
426+
Ok(plan)
427+
}
405428
/// Apply a projection without alias.
406429
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> {
407430
self.project_with_alias(expr, None)

datafusion/src/sql/planner.rs

+3-26
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,8 @@ use super::{
5959
parser::DFParser,
6060
utils::{
6161
can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
62-
find_aggregate_exprs, find_column_exprs, find_window_exprs,
63-
group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
64-
resolve_positions_to_exprs,
62+
find_aggregate_exprs, find_column_exprs, find_window_exprs, rebase_expr,
63+
resolve_aliases_to_exprs, resolve_positions_to_exprs,
6564
},
6665
};
6766
use crate::logical_plan::builder::project_with_alias;
@@ -792,7 +791,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
792791
let plan = if window_func_exprs.is_empty() {
793792
plan
794793
} else {
795-
self.window(plan, window_func_exprs)?
794+
LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
796795
};
797796

798797
let plan = if select.distinct {
@@ -839,28 +838,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
839838
LogicalPlanBuilder::from(input).project(expr)?.build()
840839
}
841840

842-
/// Wrap a plan in a window
843-
fn window(&self, input: LogicalPlan, window_exprs: Vec<Expr>) -> Result<LogicalPlan> {
844-
let mut plan = input;
845-
let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
846-
// sort by sort_key len descending, so that more deeply sorted plans gets nested further
847-
// down as children; to further mimic the behavior of PostgreSQL, we want stable sort
848-
// and a reverse so that tieing sort keys are reversed in order; note that by this rule
849-
// if there's an empty over, it'll be at the top level
850-
groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
851-
groups.reverse();
852-
for (_, exprs) in groups {
853-
let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
854-
// the partition and sort itself is done at physical level, see physical_planner's
855-
// fn create_initial_plan
856-
plan = LogicalPlanBuilder::from(plan)
857-
.window(window_exprs)?
858-
.build()?;
859-
}
860-
861-
Ok(plan)
862-
}
863-
864841
/// Wrap a plan in an aggregate
865842
fn aggregate(
866843
&self,

0 commit comments

Comments
 (0)