@@ -16,13 +16,11 @@ use datafusion::{
16
16
physical_plan:: { displayable, explain:: ExplainExec , ExecutionPlan } ,
17
17
physical_planner:: { DefaultPhysicalPlanner , PhysicalPlanner } ,
18
18
} ;
19
+ use itertools:: Itertools ;
19
20
use optd_datafusion_repr:: {
20
- plan_nodes:: {
21
- ConstantType , OptRelNode , OptRelNodeRef , OptRelNodeTyp , PhysicalHashJoin ,
22
- PhysicalNestedLoopJoin , PlanNode ,
23
- } ,
21
+ plan_nodes:: { ConstantType , OptRelNode , PlanNode } ,
24
22
properties:: schema:: Catalog ,
25
- DatafusionOptimizer ,
23
+ DatafusionOptimizer , MemoExt ,
26
24
} ;
27
25
use std:: {
28
26
collections:: HashMap ,
@@ -89,93 +87,6 @@ pub struct OptdQueryPlanner {
89
87
pub optimizer : Arc < Mutex < Option < Box < DatafusionOptimizer > > > > ,
90
88
}
91
89
92
- #[ derive( Debug , Eq , PartialEq , Hash , PartialOrd , Ord ) ]
93
- enum JoinOrder {
94
- Table ( String ) ,
95
- HashJoin ( Box < Self > , Box < Self > ) ,
96
- NestedLoopJoin ( Box < Self > , Box < Self > ) ,
97
- }
98
-
99
- #[ allow( dead_code) ]
100
- impl JoinOrder {
101
- pub fn conv_into_logical_join_order ( & self ) -> LogicalJoinOrder {
102
- match self {
103
- JoinOrder :: Table ( name) => LogicalJoinOrder :: Table ( name. clone ( ) ) ,
104
- JoinOrder :: HashJoin ( left, right) => LogicalJoinOrder :: Join (
105
- Box :: new ( left. conv_into_logical_join_order ( ) ) ,
106
- Box :: new ( right. conv_into_logical_join_order ( ) ) ,
107
- ) ,
108
- JoinOrder :: NestedLoopJoin ( left, right) => LogicalJoinOrder :: Join (
109
- Box :: new ( left. conv_into_logical_join_order ( ) ) ,
110
- Box :: new ( right. conv_into_logical_join_order ( ) ) ,
111
- ) ,
112
- }
113
- }
114
- }
115
-
116
- #[ allow( unused) ]
117
- #[ derive( Debug , Eq , PartialEq , Hash , PartialOrd , Ord ) ]
118
- enum LogicalJoinOrder {
119
- Table ( String ) ,
120
- Join ( Box < Self > , Box < Self > ) ,
121
- }
122
-
123
- #[ allow( dead_code) ]
124
- fn get_join_order ( rel_node : OptRelNodeRef ) -> Option < JoinOrder > {
125
- match rel_node. typ {
126
- OptRelNodeTyp :: PhysicalHashJoin ( _) => {
127
- let join = PhysicalHashJoin :: from_rel_node ( rel_node. clone ( ) ) . unwrap ( ) ;
128
- let left = get_join_order ( join. left ( ) . into_rel_node ( ) ) ?;
129
- let right = get_join_order ( join. right ( ) . into_rel_node ( ) ) ?;
130
- Some ( JoinOrder :: HashJoin ( Box :: new ( left) , Box :: new ( right) ) )
131
- }
132
- OptRelNodeTyp :: PhysicalNestedLoopJoin ( _) => {
133
- let join = PhysicalNestedLoopJoin :: from_rel_node ( rel_node. clone ( ) ) . unwrap ( ) ;
134
- let left = get_join_order ( join. left ( ) . into_rel_node ( ) ) ?;
135
- let right = get_join_order ( join. right ( ) . into_rel_node ( ) ) ?;
136
- Some ( JoinOrder :: NestedLoopJoin ( Box :: new ( left) , Box :: new ( right) ) )
137
- }
138
- OptRelNodeTyp :: PhysicalScan => {
139
- let scan =
140
- optd_datafusion_repr:: plan_nodes:: PhysicalScan :: from_rel_node ( rel_node) . unwrap ( ) ;
141
- Some ( JoinOrder :: Table ( scan. table ( ) . to_string ( ) ) )
142
- }
143
- _ => {
144
- for child in & rel_node. children {
145
- if let Some ( res) = get_join_order ( child. clone ( ) ) {
146
- return Some ( res) ;
147
- }
148
- }
149
- None
150
- }
151
- }
152
- }
153
-
154
- impl std:: fmt:: Display for LogicalJoinOrder {
155
- fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
156
- match self {
157
- LogicalJoinOrder :: Table ( name) => write ! ( f, "{}" , name) ,
158
- LogicalJoinOrder :: Join ( left, right) => {
159
- write ! ( f, "(Join {} {})" , left, right)
160
- }
161
- }
162
- }
163
- }
164
-
165
- impl std:: fmt:: Display for JoinOrder {
166
- fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
167
- match self {
168
- JoinOrder :: Table ( name) => write ! ( f, "{}" , name) ,
169
- JoinOrder :: HashJoin ( left, right) => {
170
- write ! ( f, "(HashJoin {} {})" , left, right)
171
- }
172
- JoinOrder :: NestedLoopJoin ( left, right) => {
173
- write ! ( f, "(NLJ {} {})" , left, right)
174
- }
175
- }
176
- }
177
- }
178
-
179
90
impl OptdQueryPlanner {
180
91
pub fn enable_adaptive ( & self ) {
181
92
self . optimizer
@@ -247,7 +158,7 @@ impl OptdQueryPlanner {
247
158
}
248
159
}
249
160
250
- let ( _ , optimized_rel, meta) = optimizer. cascades_optimize ( optd_rel) ?;
161
+ let ( group_id , optimized_rel, meta) = optimizer. cascades_optimize ( optd_rel) ?;
251
162
252
163
if let Some ( explains) = & mut explains {
253
164
explains. push ( StringifiedPlan :: new (
@@ -258,52 +169,17 @@ impl OptdQueryPlanner {
258
169
. unwrap ( )
259
170
. explain_to_string ( if verbose { Some ( & meta) } else { None } ) ,
260
171
) ) ;
261
-
262
- // const ENABLE_JOIN_ORDER: bool = false;
263
-
264
- // if ENABLE_JOIN_ORDER {
265
- // let join_order = get_join_order(optimized_rel.clone());
266
- // explains.push(StringifiedPlan::new(
267
- // PlanType::OptimizedPhysicalPlan {
268
- // optimizer_name: "optd-join-order".to_string(),
269
- // },
270
- // if let Some(join_order) = join_order {
271
- // join_order.to_string()
272
- // } else {
273
- // "None".to_string()
274
- // },
275
- // ));
276
- // let bindings = optimizer
277
- // .optd_cascades_optimizer()
278
- // .get_all_group_bindings(group_id, true);
279
- // let mut join_orders = BTreeSet::new();
280
- // let mut logical_join_orders = BTreeSet::new();
281
- // for binding in bindings {
282
- // if let Some(join_order) = get_join_order(binding) {
283
- // logical_join_orders.insert(join_order.conv_into_logical_join_order());
284
- // join_orders.insert(join_order);
285
- // }
286
- // }
287
- // explains.push(StringifiedPlan::new(
288
- // PlanType::OptimizedPhysicalPlan {
289
- // optimizer_name: "optd-all-join-orders".to_string(),
290
- // },
291
- // join_orders.iter().map(|x| x.to_string()).join("\n"),
292
- // ));
293
- // explains.push(StringifiedPlan::new(
294
- // PlanType::OptimizedPhysicalPlan {
295
- // optimizer_name: "optd-all-logical-join-orders".to_string(),
296
- // },
297
- // logical_join_orders.iter().map(|x| x.to_string()).join("\n"),
298
- // ));
299
- // }
172
+ let join_orders = optimizer
173
+ . optd_cascades_optimizer ( )
174
+ . memo ( )
175
+ . enumerate_join_order ( group_id) ;
176
+ explains. push ( StringifiedPlan :: new (
177
+ PlanType :: OptimizedPhysicalPlan {
178
+ optimizer_name : "optd-all-logical-join-orders" . to_string ( ) ,
179
+ } ,
180
+ join_orders. iter ( ) . map ( |x| x. to_string ( ) ) . join ( "\n " ) ,
181
+ ) ) ;
300
182
}
301
- // println!(
302
- // "{} cost={}",
303
- // get_join_order(optimized_rel.clone()).unwrap(),
304
- // optimizer.optd_optimizer().get_cost_of(group_id)
305
- // );
306
- // optimizer.dump(Some(group_id));
307
183
ctx. optimizer = Some ( & optimizer) ;
308
184
let physical_plan = ctx. conv_from_optd ( optimized_rel, meta) . await ?;
309
185
if let Some ( explains) = & mut explains {
0 commit comments