Skip to content

Commit af73fdf

Browse files
authored
refactor(optimizer): optimize trait and pipeline (#17712)
* refactor(optimizer): optimize trait * add optimizer trait * prepare for pipeline * memo&optimize_query to use pipeline * impl cascade memo trait
1 parent edc7222 commit af73fdf

File tree

24 files changed

+447
-172
lines changed

24 files changed

+447
-172
lines changed

src/query/service/tests/it/sql/planner/optimizer/agg_index_query_rewrite.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ async fn test_query_rewrite_impl(format: &str) -> Result<()> {
394394

395395
let opt_ctx = OptimizerContext::new(ctx.clone(), metadata.clone());
396396
let result = RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::TryApplyAggIndex])
397-
.optimize(&query)?;
397+
.optimize_sync(&query)?;
398398
let agg_index = find_push_down_index_info(&result)?;
399399
assert_eq!(
400400
suite.is_matched,
@@ -449,7 +449,8 @@ async fn plan_sql(
449449
{
450450
let s_expr = if optimize {
451451
let opt_ctx = OptimizerContext::new(ctx.clone(), metadata.clone());
452-
RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).optimize(&s_expr)?
452+
RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES)
453+
.optimize_sync(&s_expr)?
453454
} else {
454455
*s_expr
455456
};

src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ impl MutationExpression {
313313
let opt_ctx =
314314
OptimizerContext::new(binder.ctx.clone(), binder.metadata.clone());
315315
let mut rewriter = SubqueryRewriter::new(opt_ctx, None);
316-
let s_expr = rewriter.optimize(&s_expr)?;
316+
let s_expr = rewriter.optimize_sync(&s_expr)?;
317317

318318
Ok(MutationExpressionBindResult {
319319
input: s_expr,

src/query/sql/src/planner/optimizer/cost/cost.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl AddAssign for Cost {
5555
}
5656
}
5757

58-
pub trait CostModel: Send {
58+
pub trait CostModel: Send + Sync {
5959
/// Compute cost of given `MExpr`(children are not encapsulated).
6060
fn compute_cost(&self, memo: &Memo, m_expr: &MExpr) -> Result<Cost>;
6161
}

src/query/sql/src/planner/optimizer/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ mod cost;
1616
pub mod ir;
1717
#[allow(clippy::module_inception)]
1818
mod optimizer;
19+
mod optimizer_api;
1920
mod optimizer_context;
2021
pub mod optimizers;
22+
pub mod pipeline;
2123
mod statistics;
22-
mod util;
2324

2425
pub use optimizer::optimize;
2526
pub use optimizer::optimize_query;
27+
pub use optimizer_api::Optimizer;
2628
pub use optimizer_context::OptimizerContext;
2729
pub use optimizers::rule::agg_index;
28-
pub use util::contains_local_table_scan;

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 73 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ use crate::optimizer::optimizers::rule::RuleID;
3737
use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES;
3838
use crate::optimizer::optimizers::CascadesOptimizer;
3939
use crate::optimizer::optimizers::DPhpy;
40+
use crate::optimizer::pipeline::OptimizerPipeline;
4041
use crate::optimizer::statistics::CollectStatisticsOptimizer;
41-
use crate::optimizer::util::contains_local_table_scan;
42-
use crate::optimizer::util::contains_warehouse_table_scan;
4342
use crate::optimizer::OptimizerContext;
4443
use crate::plans::ConstantTableScan;
4544
use crate::plans::CopyIntoLocationPlan;
@@ -92,11 +91,8 @@ pub async fn optimize(opt_ctx: Arc<OptimizerContext>, plan: Plan) -> Result<Plan
9291
));
9392
};
9493

95-
let mut s_expr = s_expr;
96-
if s_expr.contain_subquery() {
97-
s_expr =
98-
Box::new(SubqueryRewriter::new(opt_ctx.clone(), None).optimize(&s_expr)?);
99-
}
94+
let s_expr =
95+
Box::new(SubqueryRewriter::new(opt_ctx.clone(), None).optimize_sync(&s_expr)?);
10096
Ok(Plan::Explain {
10197
kind,
10298
config,
@@ -241,129 +237,86 @@ pub async fn optimize(opt_ctx: Arc<OptimizerContext>, plan: Plan) -> Result<Plan
241237
}
242238
}
243239

244-
pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, mut s_expr: SExpr) -> Result<SExpr> {
245-
let metadata = opt_ctx.get_metadata();
246-
247-
// 1. Configure distributed optimization based on table types
248-
if contains_local_table_scan(&s_expr, &metadata) {
249-
opt_ctx.set_enable_distributed_optimization(false);
250-
info!("Disable distributed optimization due to local table scan.");
251-
} else if contains_warehouse_table_scan(&s_expr, &metadata) {
252-
let warehouse = opt_ctx.get_table_ctx().get_warehouse_cluster().await?;
253-
254-
if !warehouse.is_empty() {
255-
opt_ctx.set_enable_distributed_optimization(true);
256-
info!("Enable distributed optimization due to warehouse table scan.");
257-
}
258-
}
259-
260-
// 2. Eliminate subqueries by rewriting them into more efficient forms
261-
s_expr = SubqueryRewriter::new(opt_ctx.clone(), None).optimize(&s_expr)?;
262-
263-
// 3. Apply statistics aggregation to gather and propagate statistics
264-
s_expr = RuleStatsAggregateOptimizer::new(opt_ctx.clone())
265-
.optimize(&s_expr)
266-
.await?;
267-
268-
// 4. Collect statistics for SExpr nodes to support cost estimation
269-
s_expr = CollectStatisticsOptimizer::new(opt_ctx.clone())
270-
.optimize(&s_expr)
271-
.await?;
272-
273-
// 5. Normalize aggregate, it should be executed before RuleSplitAggregate.
274-
s_expr = RuleNormalizeAggregateOptimizer::new().optimize(&s_expr)?;
275-
276-
// 6. Pull up and infer filter.
277-
s_expr = PullUpFilterOptimizer::new(opt_ctx.clone()).optimize(&s_expr)?;
278-
279-
// 7. Run default rewrite rules
280-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).optimize(&s_expr)?;
281-
282-
// 8. Run post rewrite rules
283-
s_expr =
284-
RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::SplitAggregate]).optimize(&s_expr)?;
285-
286-
// 9. Apply DPhyp algorithm for cost-based join reordering
287-
s_expr = DPhpy::new(opt_ctx.clone()).optimize(&s_expr).await?;
288-
289-
// 10. After join reorder, Convert some single join to inner join.
290-
s_expr = SingleToInnerOptimizer::new().optimize(&s_expr)?;
291-
292-
// 11. Deduplicate join conditions.
293-
s_expr = DeduplicateJoinConditionOptimizer::new().optimize(&s_expr)?;
294-
295-
// 12. Apply join commutativity to further optimize join ordering
296-
if opt_ctx.get_enable_join_reorder() {
297-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice())
298-
.optimize(&s_expr)?;
299-
}
300-
301-
// 13. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
302-
s_expr = CascadesOptimizer::new(opt_ctx.clone())?.optimize(s_expr)?;
303-
304-
// 14. Eliminate unnecessary scalar calculations to clean up the final plan
305-
if !opt_ctx.get_planning_agg_index() {
306-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice())
307-
.optimize(&s_expr)?;
308-
}
240+
pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Result<SExpr> {
241+
let mut pipeline = OptimizerPipeline::new(opt_ctx.clone(), s_expr.clone())
242+
.await?
243+
// 2. Eliminate subqueries by rewriting them into more efficient form
244+
.add(SubqueryRewriter::new(opt_ctx.clone(), None))
245+
// 3. Apply statistics aggregation to gather and propagate statistics
246+
.add(RuleStatsAggregateOptimizer::new(opt_ctx.clone()))
247+
// 4. Collect statistics for SExpr nodes to support cost estimation
248+
.add(CollectStatisticsOptimizer::new(opt_ctx.clone()))
249+
// 5. Normalize aggregate, it should be executed before RuleSplitAggregate.
250+
.add(RuleNormalizeAggregateOptimizer::new())
251+
// 6. Pull up and infer filter.
252+
.add(PullUpFilterOptimizer::new(opt_ctx.clone()))
253+
// 7. Run default rewrite rules
254+
.add(RecursiveOptimizer::new(
255+
opt_ctx.clone(),
256+
&DEFAULT_REWRITE_RULES,
257+
))
258+
// 8. Run post rewrite rules
259+
.add(RecursiveOptimizer::new(opt_ctx.clone(), &[
260+
RuleID::SplitAggregate,
261+
]))
262+
// 9. Apply DPhyp algorithm for cost-based join reordering
263+
.add(DPhpy::new(opt_ctx.clone()))
264+
// 10. After join reorder, Convert some single join to inner join.
265+
.add(SingleToInnerOptimizer::new())
266+
// 11. Deduplicate join conditions.
267+
.add(DeduplicateJoinConditionOptimizer::new())
268+
// 12. Apply join commutativity to further optimize join ordering
269+
.add_if(
270+
opt_ctx.get_enable_join_reorder(),
271+
RecursiveOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice()),
272+
)
273+
// 13. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
274+
.add(CascadesOptimizer::new(opt_ctx.clone())?)
275+
// 14. Eliminate unnecessary scalar calculations to clean up the final plan
276+
.add_if(
277+
!opt_ctx.get_planning_agg_index(),
278+
RecursiveOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice()),
279+
);
280+
281+
// 15. Execute the pipeline
282+
let s_expr = pipeline.execute().await?;
309283

310284
Ok(s_expr)
311285
}
312286

313-
// TODO(leiysky): reuse the optimization logic with `optimize_query`
314-
async fn get_optimized_memo(opt_ctx: Arc<OptimizerContext>, mut s_expr: SExpr) -> Result<Memo> {
315-
let metadata = opt_ctx.get_metadata();
316-
let _table_ctx = opt_ctx.get_table_ctx();
317-
318-
if contains_local_table_scan(&s_expr, &metadata) {
319-
opt_ctx.set_enable_distributed_optimization(false);
320-
info!("Disable distributed optimization due to local table scan.");
321-
} else if contains_warehouse_table_scan(&s_expr, &metadata) {
322-
let warehouse = opt_ctx.get_table_ctx().get_warehouse_cluster().await?;
323-
324-
if !warehouse.is_empty() {
325-
opt_ctx.set_enable_distributed_optimization(true);
326-
info!("Enable distributed optimization due to warehouse table scan.");
327-
}
328-
}
329-
330-
// Decorrelate subqueries, after this step, there should be no subquery in the expression.
331-
if s_expr.contain_subquery() {
332-
s_expr = SubqueryRewriter::new(opt_ctx.clone(), None).optimize(&s_expr)?;
333-
}
334-
335-
s_expr = RuleStatsAggregateOptimizer::new(opt_ctx.clone())
336-
.optimize(&s_expr)
337-
.await?;
338-
339-
// Collect statistics for each leaf node in SExpr.
340-
s_expr = CollectStatisticsOptimizer::new(opt_ctx.clone())
341-
.optimize(&s_expr)
342-
.await?;
343-
344-
// Pull up and infer filter.
345-
s_expr = PullUpFilterOptimizer::new(opt_ctx.clone()).optimize(&s_expr)?;
346-
// Run default rewrite rules
347-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).optimize(&s_expr)?;
348-
// Run post rewrite rules
349-
s_expr =
350-
RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::SplitAggregate]).optimize(&s_expr)?;
351-
352-
// Cost based optimization
353-
if opt_ctx.get_enable_dphyp() && opt_ctx.get_enable_join_reorder() {
354-
s_expr = DPhpy::new(opt_ctx.clone()).optimize(&s_expr).await?;
355-
}
356-
let mut cascades = CascadesOptimizer::new(opt_ctx.clone())?;
357-
cascades.optimize(s_expr)?;
358-
359-
Ok(cascades.memo)
287+
async fn get_optimized_memo(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Result<Memo> {
288+
let mut pipeline = OptimizerPipeline::new(opt_ctx.clone(), s_expr.clone())
289+
.await?
290+
// Decorrelate subqueries, after this step, there should be no subquery in the expression.
291+
.add(SubqueryRewriter::new(opt_ctx.clone(), None))
292+
.add(RuleStatsAggregateOptimizer::new(opt_ctx.clone()))
293+
// Collect statistics for each leaf node in SExpr.
294+
.add(CollectStatisticsOptimizer::new(opt_ctx.clone()))
295+
// Pull up and infer filter.
296+
.add(PullUpFilterOptimizer::new(opt_ctx.clone()))
297+
// Run default rewrite rules
298+
.add(RecursiveOptimizer::new(
299+
opt_ctx.clone(),
300+
&DEFAULT_REWRITE_RULES,
301+
))
302+
// Run post rewrite rules
303+
.add(RecursiveOptimizer::new(opt_ctx.clone(), &[
304+
RuleID::SplitAggregate,
305+
]))
306+
// Cost based optimization
307+
.add(DPhpy::new(opt_ctx.clone()))
308+
.add(CascadesOptimizer::new(opt_ctx.clone())?);
309+
310+
let _s_expr = pipeline.execute().await?;
311+
312+
Ok(pipeline.memo())
360313
}
361314

362315
async fn optimize_mutation(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Result<Plan> {
363316
// Optimize the input plan.
364317
let mut input_s_expr = optimize_query(opt_ctx.clone(), s_expr.child(0)?.clone()).await?;
365318
input_s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::MergeFilterIntoMutation])
366-
.optimize(&input_s_expr)?;
319+
.optimize_sync(&input_s_expr)?;
367320

368321
// For distributed query optimization, we need to remove the Exchange operator at the top of the plan.
369322
if let &RelOperator::Exchange(_) = input_s_expr.plan() {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
17+
use crate::optimizer::ir::Memo;
18+
use crate::optimizer::ir::SExpr;
19+
20+
/// Trait defining the interface for query optimizers.
21+
22+
#[async_trait::async_trait]
23+
pub trait Optimizer: Send + Sync {
24+
/// Optimize the given expression and return the optimized version.
25+
async fn optimize(&mut self, expr: &SExpr) -> Result<SExpr>;
26+
27+
/// Returns a unique identifier for this optimizer.
28+
fn name(&self) -> &'static str;
29+
30+
/// Get the memo if this optimizer maintains one.
31+
/// Default implementation returns None for optimizers that don't use a memo.
32+
fn memo(&self) -> Option<&Memo> {
33+
None
34+
}
35+
}

0 commit comments

Comments
 (0)