@@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc};
3
3
use anyhow:: { bail, Context , Result } ;
4
4
use async_recursion:: async_recursion;
5
5
use datafusion:: {
6
- arrow:: datatypes:: { Schema , SchemaRef } ,
6
+ arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ,
7
7
datasource:: source_as_provider,
8
8
logical_expr:: Operator ,
9
9
physical_expr,
@@ -12,7 +12,8 @@ use datafusion::{
12
12
aggregates:: AggregateMode ,
13
13
expressions:: create_aggregate_expr,
14
14
joins:: {
15
- utils:: { ColumnIndex , JoinFilter } , CrossJoinExec , PartitionMode
15
+ utils:: { ColumnIndex , JoinFilter } ,
16
+ CrossJoinExec , PartitionMode ,
16
17
} ,
17
18
projection:: ProjectionExec ,
18
19
AggregateExpr , ExecutionPlan , PhysicalExpr ,
@@ -23,14 +24,36 @@ use optd_datafusion_repr::{
23
24
plan_nodes:: {
24
25
BinOpExpr , BinOpType , ColumnRefExpr , ConstantExpr , ConstantType , Expr , FuncExpr , FuncType ,
25
26
JoinType , LogOpExpr , LogOpType , OptRelNode , OptRelNodeRef , OptRelNodeTyp , PhysicalAgg ,
26
- PhysicalFilter , PhysicalHashJoin , PhysicalNestedLoopJoin , PhysicalProjection , PhysicalScan ,
27
- PhysicalSort , PlanNode , SortOrderExpr , SortOrderType ,
27
+ PhysicalEmptyRelation , PhysicalFilter , PhysicalHashJoin , PhysicalNestedLoopJoin ,
28
+ PhysicalProjection , PhysicalScan , PhysicalSort , PlanNode , SortOrderExpr , SortOrderType ,
28
29
} ,
30
+ properties:: schema:: Schema as OptdSchema ,
29
31
PhysicalCollector ,
30
32
} ;
31
33
32
34
use crate :: { physical_collector:: CollectorExec , OptdPlanContext } ;
33
35
36
+ // TODO: current DataType and ConstantType are not 1 to 1 mapping
37
+ // optd schema stores constantType from data type in catalog.get
38
+ // for decimal128, the precision is lost
39
+ fn from_optd_schema ( optd_schema : & OptdSchema ) -> Schema {
40
+ let match_type = |typ : & ConstantType | match typ {
41
+ ConstantType :: Any => unimplemented ! ( ) ,
42
+ ConstantType :: Bool => DataType :: Boolean ,
43
+ ConstantType :: Int => DataType :: Int64 ,
44
+ ConstantType :: Date => DataType :: Date32 ,
45
+ ConstantType :: Decimal => DataType :: Float64 ,
46
+ ConstantType :: Utf8String => DataType :: Utf8 ,
47
+ } ;
48
+ let fields: Vec < _ > = optd_schema
49
+ . 0
50
+ . iter ( )
51
+ . enumerate ( )
52
+ . map ( |( i, typ) | Field :: new ( & format ! ( "c{}" , i) , match_type ( typ) , false ) )
53
+ . collect ( ) ;
54
+ Schema :: new ( fields)
55
+ }
56
+
34
57
impl OptdPlanContext < ' _ > {
35
58
#[ async_recursion]
36
59
async fn from_optd_table_scan (
@@ -317,7 +340,8 @@ impl OptdPlanContext<'_> {
317
340
let physical_expr = self . from_optd_expr ( node. cond ( ) , & Arc :: new ( filter_schema. clone ( ) ) ) ?;
318
341
319
342
if let JoinType :: Cross = node. join_type ( ) {
320
- return Ok ( Arc :: new ( CrossJoinExec :: new ( left_exec, right_exec) ) as Arc < dyn ExecutionPlan + ' static > ) ;
343
+ return Ok ( Arc :: new ( CrossJoinExec :: new ( left_exec, right_exec) )
344
+ as Arc < dyn ExecutionPlan + ' static > ) ;
321
345
}
322
346
323
347
let join_type = match node. join_type ( ) {
@@ -398,6 +422,10 @@ impl OptdPlanContext<'_> {
398
422
399
423
#[ async_recursion]
400
424
async fn from_optd_plan_node ( & mut self , node : PlanNode ) -> Result < Arc < dyn ExecutionPlan > > {
425
+ let mut schema = OptdSchema ( vec ! [ ] ) ;
426
+ if node. typ ( ) == OptRelNodeTyp :: PhysicalEmptyRelation {
427
+ schema = node. schema ( self . optimizer . unwrap ( ) . optd_optimizer ( ) ) ;
428
+ }
401
429
let rel_node = node. into_rel_node ( ) ;
402
430
let rel_node_dbg = rel_node. clone ( ) ;
403
431
let result = match & rel_node. typ {
@@ -440,6 +468,14 @@ impl OptdPlanContext<'_> {
440
468
self . optimizer . as_ref ( ) . unwrap ( ) . runtime_statistics . clone ( ) ,
441
469
) ) as Arc < dyn ExecutionPlan > )
442
470
}
471
+ OptRelNodeTyp :: PhysicalEmptyRelation => {
472
+ let physical_node = PhysicalEmptyRelation :: from_rel_node ( rel_node) . unwrap ( ) ;
473
+ let datafusion_schema: Schema = from_optd_schema ( & schema) ;
474
+ Ok ( Arc :: new ( datafusion:: physical_plan:: empty:: EmptyExec :: new (
475
+ physical_node. produce_one_row ( ) ,
476
+ Arc :: new ( datafusion_schema) ,
477
+ ) ) as Arc < dyn ExecutionPlan > )
478
+ }
443
479
typ => unimplemented ! ( "{}" , typ) ,
444
480
} ;
445
481
result. with_context ( || format ! ( "when processing {}" , rel_node_dbg) )
0 commit comments