Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit dce1dca

Browse files
committed
add empty relation; add cost calculation for empty relation; add schema conversion
Signed-off-by: AveryQi115 <[email protected]>
1 parent 4f313ba commit dce1dca

File tree

12 files changed

+211
-25
lines changed

12 files changed

+211
-25
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

optd-datafusion-bridge/src/from_optd.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::{collections::HashMap, fmt::Display, sync::Arc};
22

33
use anyhow::{bail, Context, Result};
44
use async_recursion::async_recursion;
55
use datafusion::{
6-
arrow::datatypes::{Schema, SchemaRef},
7-
datasource::source_as_provider,
8-
logical_expr::Operator,
9-
physical_expr,
10-
physical_plan::{
6+
arrow::datatypes::{Schema, SchemaRef}, catalog::schema, common::DFSchema, datasource::source_as_provider, logical_expr::Operator, physical_expr, physical_plan::{
117
self,
128
aggregates::AggregateMode,
139
expressions::create_aggregate_expr,
@@ -16,17 +12,17 @@ use datafusion::{
1612
},
1713
projection::ProjectionExec,
1814
AggregateExpr, ExecutionPlan, PhysicalExpr,
19-
},
20-
scalar::ScalarValue,
15+
}, scalar::ScalarValue
2116
};
2217
use optd_datafusion_repr::{
2318
plan_nodes::{
2419
BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, ConstantType, Expr, FuncExpr, FuncType,
2520
JoinType, LogOpExpr, LogOpType, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PhysicalAgg,
2621
PhysicalFilter, PhysicalHashJoin, PhysicalNestedLoopJoin, PhysicalProjection, PhysicalScan,
27-
PhysicalSort, PlanNode, SortOrderExpr, SortOrderType,
22+
PhysicalSort, PhysicalEmptyRelation, PlanNode, SortOrderExpr, SortOrderType,
2823
},
2924
PhysicalCollector,
25+
properties::schema::Schema as OptdSchema,
3026
};
3127

3228
use crate::{physical_collector::CollectorExec, OptdPlanContext};
@@ -398,6 +394,10 @@ impl OptdPlanContext<'_> {
398394

399395
#[async_recursion]
400396
async fn from_optd_plan_node(&mut self, node: PlanNode) -> Result<Arc<dyn ExecutionPlan>> {
397+
let mut schema = OptdSchema(vec![]);
398+
if node.typ() == OptRelNodeTyp::PhysicalEmptyRelation {
399+
schema = node.schema(self.optimizer.unwrap().optd_optimizer());
400+
}
401401
let rel_node = node.into_rel_node();
402402
let rel_node_dbg = rel_node.clone();
403403
let result = match &rel_node.typ {
@@ -440,12 +440,21 @@ impl OptdPlanContext<'_> {
440440
self.optimizer.as_ref().unwrap().runtime_statistics.clone(),
441441
)) as Arc<dyn ExecutionPlan>)
442442
}
443+
OptRelNodeTyp::PhysicalEmptyRelation => {
444+
let physical_node = PhysicalEmptyRelation::from_rel_node(rel_node).unwrap();
445+
let datafusion_schema : Schema = schema.into();
446+
Ok(Arc::new(datafusion::physical_plan::empty::EmptyExec::new(
447+
physical_node.produce_one_row(),
448+
Arc::new(datafusion_schema),
449+
)) as Arc<dyn ExecutionPlan>)
450+
}
443451
typ => unimplemented!("{}", typ),
444452
};
445453
result.with_context(|| format!("when processing {}", rel_node_dbg))
446454
}
447455

448456
pub async fn from_optd(&mut self, root_rel: OptRelNodeRef) -> Result<Arc<dyn ExecutionPlan>> {
457+
println!("{}", root_rel);
449458
self.from_optd_plan_node(PlanNode::from_rel_node(root_rel).unwrap())
450459
.await
451460
}

optd-datafusion-bridge/src/into_optd.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use optd_core::rel_node::RelNode;
88
use optd_datafusion_repr::plan_nodes::{
99
BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, Expr, ExprList, FuncExpr, FuncType,
1010
JoinType, LogOpExpr, LogOpType, LogicalAgg, LogicalFilter, LogicalJoin, LogicalProjection,
11-
LogicalScan, LogicalSort, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode, SortOrderExpr,
11+
LogicalScan, LogicalSort, LogicalEmptyRelation, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode, SortOrderExpr,
1212
SortOrderType,
1313
};
1414

@@ -69,6 +69,10 @@ impl OptdPlanContext<'_> {
6969
let x = x.as_ref().unwrap();
7070
Ok(ConstantExpr::string(x).into_expr())
7171
}
72+
ScalarValue::Int64(x) => {
73+
let x = x.as_ref().unwrap();
74+
Ok(ConstantExpr::int(*x as i64).into_expr())
75+
}
7276
ScalarValue::Date32(x) => {
7377
let x = x.as_ref().unwrap();
7478
Ok(ConstantExpr::date(*x as i64).into_expr())
@@ -236,6 +240,10 @@ impl OptdPlanContext<'_> {
236240
Ok(LogicalJoin::new(left, right, ConstantExpr::bool(true).into_expr(), JoinType::Cross))
237241
}
238242

243+
fn into_optd_empty_relation(&mut self, node: &logical_plan::EmptyRelation) -> Result<LogicalEmptyRelation> {
244+
Ok(LogicalEmptyRelation::new(node.produce_one_row))
245+
}
246+
239247
fn into_optd_plan_node(&mut self, node: &LogicalPlan) -> Result<PlanNode> {
240248
let node = match node {
241249
LogicalPlan::TableScan(node) => self.into_optd_table_scan(node)?.into_plan_node(),
@@ -246,6 +254,7 @@ impl OptdPlanContext<'_> {
246254
LogicalPlan::Join(node) => self.into_optd_join(node)?.into_plan_node(),
247255
LogicalPlan::Filter(node) => self.into_optd_filter(node)?.into_plan_node(),
248256
LogicalPlan::CrossJoin(node) => self.into_optd_cross_join(node)?.into_plan_node(),
257+
LogicalPlan::EmptyRelation(node) => self.into_optd_empty_relation(node)?.into_plan_node(),
249258
_ => bail!(
250259
"unsupported plan node: {}",
251260
format!("{:?}", node).split('\n').next().unwrap()

optd-datafusion-repr/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ edition = "2021"
77

88
[dependencies]
99
anyhow = "1"
10+
arrow-schema = "*"
11+
datafusion = "32.0.0"
1012
num-traits = "0.2"
1113
num-derive = "0.2"
1214
tracing = "0.1"

optd-datafusion-repr/src/cost/base_cost.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ impl CostModel<OptRelNodeTyp> for OptCostModel {
106106
.unwrap_or(1) as f64;
107107
Self::cost(row_cnt, 0.0, row_cnt)
108108
}
109+
OptRelNodeTyp::PhysicalEmptyRelation => Self::cost(1.0, 0.01, 0.0),
109110
OptRelNodeTyp::PhysicalFilter => {
110111
let (row_cnt, _, _) = Self::cost_tuple(&children[0]);
111112
let (_, compute_cost, _) = Self::cost_tuple(&children[1]);

optd-datafusion-repr/src/plan_nodes.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub(super) mod macros;
99
mod projection;
1010
mod scan;
1111
mod sort;
12+
mod empty_relation;
1213

1314
use std::sync::Arc;
1415

@@ -29,6 +30,7 @@ use pretty_xmlish::{Pretty, PrettyConfig};
2930
pub use projection::{LogicalProjection, PhysicalProjection};
3031
pub use scan::{LogicalScan, PhysicalScan};
3132
pub use sort::{LogicalSort, PhysicalSort};
33+
pub use empty_relation::{LogicalEmptyRelation, PhysicalEmptyRelation};
3234

3335
use crate::{
3436
adaptive::PhysicalCollector,
@@ -48,6 +50,7 @@ pub enum OptRelNodeTyp {
4850
Sort,
4951
Agg,
5052
Apply(ApplyType),
53+
EmptyRelation,
5154
// Physical plan nodes
5255
PhysicalProjection,
5356
PhysicalFilter,
@@ -56,6 +59,7 @@ pub enum OptRelNodeTyp {
5659
PhysicalAgg,
5760
PhysicalHashJoin(JoinType),
5861
PhysicalNestedLoopJoin(JoinType),
62+
PhysicalEmptyRelation,
5963
PhysicalCollector(GroupId), // only produced after optimization is done
6064
// Expressions
6165
Constant(ConstantType),
@@ -78,6 +82,7 @@ impl OptRelNodeTyp {
7882
| Self::Apply(_)
7983
| Self::Sort
8084
| Self::Agg
85+
| Self::EmptyRelation
8186
| Self::PhysicalProjection
8287
| Self::PhysicalFilter
8388
| Self::PhysicalNestedLoopJoin(_)
@@ -86,6 +91,7 @@ impl OptRelNodeTyp {
8691
| Self::PhysicalAgg
8792
| Self::PhysicalHashJoin(_)
8893
| Self::PhysicalCollector(_)
94+
| Self::PhysicalEmptyRelation
8995
)
9096
}
9197

@@ -120,6 +126,7 @@ impl RelNodeTyp for OptRelNodeTyp {
120126
| Self::Apply(_)
121127
| Self::Sort
122128
| Self::Agg
129+
| Self::EmptyRelation
123130
)
124131
}
125132

@@ -194,7 +201,7 @@ impl PlanNode {
194201
self.0.typ.clone()
195202
}
196203

197-
pub fn schema(&self, optimizer: CascadesOptimizer<OptRelNodeTyp>) -> Schema {
204+
pub fn schema(&self, optimizer: &CascadesOptimizer<OptRelNodeTyp>) -> Schema {
198205
let group_id = optimizer.resolve_group_id(self.0.clone());
199206
optimizer.get_property_by_group::<SchemaPropertyBuilder>(group_id, 0)
200207
}
@@ -300,6 +307,9 @@ pub fn explain(rel_node: OptRelNodeRef) -> Pretty<'static> {
300307
OptRelNodeTyp::Apply(_) => LogicalApply::from_rel_node(rel_node)
301308
.unwrap()
302309
.dispatch_explain(),
310+
OptRelNodeTyp::EmptyRelation => LogicalEmptyRelation::from_rel_node(rel_node)
311+
.unwrap()
312+
.dispatch_explain(),
303313
OptRelNodeTyp::PhysicalFilter => PhysicalFilter::from_rel_node(rel_node)
304314
.unwrap()
305315
.dispatch_explain(),
@@ -345,6 +355,9 @@ pub fn explain(rel_node: OptRelNodeRef) -> Pretty<'static> {
345355
OptRelNodeTyp::PhysicalCollector(group_id) => PhysicalCollector::from_rel_node(rel_node)
346356
.unwrap()
347357
.dispatch_explain(),
358+
OptRelNodeTyp::PhysicalEmptyRelation => PhysicalEmptyRelation::from_rel_node(rel_node)
359+
.unwrap()
360+
.dispatch_explain(),
348361
}
349362
}
350363

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use pretty_xmlish::Pretty;
2+
3+
use optd_core::rel_node::{RelNode, Value};
4+
5+
use super::{replace_typ, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
6+
7+
#[derive(Clone, Debug)]
8+
pub struct LogicalEmptyRelation(pub PlanNode);
9+
10+
impl OptRelNode for LogicalEmptyRelation {
11+
fn into_rel_node(self) -> OptRelNodeRef {
12+
self.0.into_rel_node()
13+
}
14+
15+
fn from_rel_node(rel_node: OptRelNodeRef) -> Option<Self> {
16+
if rel_node.typ != OptRelNodeTyp::EmptyRelation {
17+
return None;
18+
}
19+
PlanNode::from_rel_node(rel_node).map(Self)
20+
}
21+
22+
fn dispatch_explain(&self) -> Pretty<'static> {
23+
Pretty::childless_record(
24+
"LogicalEmptyRelation",
25+
vec![("produce_one_row", self.produce_one_row().to_string().into())],
26+
)
27+
}
28+
}
29+
30+
impl LogicalEmptyRelation {
31+
pub fn new(produce_one_row: bool) -> LogicalEmptyRelation {
32+
LogicalEmptyRelation(PlanNode(
33+
RelNode {
34+
typ: OptRelNodeTyp::EmptyRelation,
35+
children: vec![],
36+
data: Some(Value::Bool(produce_one_row)),
37+
}
38+
.into(),
39+
))
40+
}
41+
42+
pub fn produce_one_row(&self) -> bool {
43+
self.clone().into_rel_node().data.as_ref().unwrap().as_bool()
44+
}
45+
}
46+
47+
#[derive(Clone, Debug)]
48+
pub struct PhysicalEmptyRelation(pub PlanNode);
49+
50+
impl OptRelNode for PhysicalEmptyRelation {
51+
fn into_rel_node(self) -> OptRelNodeRef {
52+
replace_typ(self.0.into_rel_node(), OptRelNodeTyp::PhysicalEmptyRelation)
53+
}
54+
55+
fn from_rel_node(rel_node: OptRelNodeRef) -> Option<Self> {
56+
if rel_node.typ != OptRelNodeTyp::PhysicalEmptyRelation {
57+
return None;
58+
}
59+
PlanNode::from_rel_node(rel_node).map(Self)
60+
}
61+
62+
fn dispatch_explain(&self) -> Pretty<'static> {
63+
Pretty::childless_record(
64+
"PhysicalEmptyRelation",
65+
vec![("produce_one_row", self.produce_one_row().to_string().into())],
66+
)
67+
}
68+
}
69+
70+
impl PhysicalEmptyRelation {
71+
pub fn new(node: PlanNode) -> PhysicalEmptyRelation {
72+
Self(node)
73+
}
74+
75+
pub fn produce_one_row(&self) -> bool {
76+
self.clone().into_rel_node().data.as_ref().unwrap().as_bool()
77+
}
78+
}

optd-datafusion-repr/src/properties/schema.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,41 @@ use std::{
66
use optd_core::property::PropertyBuilder;
77

88
use crate::plan_nodes::{ConstantType, OptRelNodeTyp};
9+
use datafusion::arrow::datatypes::{DataType, Field, Schema as DatafusionSchema};
910

1011
#[derive(Clone, Debug)]
1112
pub struct Schema(pub Vec<ConstantType>);
1213

14+
// TODO: add names, nullable to schema
1315
impl Schema {
1416
pub fn len(&self) -> usize {
1517
self.0.len()
1618
}
1719
}
1820

21+
impl Into<DatafusionSchema> for Schema {
22+
// TODO: current DataType and ConstantType are not 1 to 1 mapping
23+
// optd schema stores constantType from data type in catalog.get
24+
// for decimal128, the precision is lost
25+
fn into(self) -> DatafusionSchema {
26+
let match_type = |typ: &ConstantType| match typ {
27+
ConstantType::Any => unimplemented!(),
28+
ConstantType::Bool => DataType::Boolean,
29+
ConstantType::Int => DataType::Int64,
30+
ConstantType::Date => DataType::Date32,
31+
ConstantType::Decimal => DataType::Float64,
32+
ConstantType::Utf8String => DataType::Utf8,
33+
};
34+
let fields : Vec<_> = self
35+
.0
36+
.iter()
37+
.enumerate()
38+
.map(|(i, typ)| Field::new(&format!("c{}", i), match_type(typ), false))
39+
.collect();
40+
DatafusionSchema::new(fields)
41+
}
42+
}
43+
1944
pub trait Catalog: Send + Sync + 'static {
2045
fn get(&self, name: &str) -> Schema;
2146
}

optd-datafusion-repr/src/rules/physical.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl PhysicalConversionRule {
4040
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Filter)),
4141
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Sort)),
4242
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Agg)),
43+
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::EmptyRelation)),
4344
]
4445
}
4546
}
@@ -117,6 +118,14 @@ impl<O: Optimizer<OptRelNodeTyp>> Rule<OptRelNodeTyp, O> for PhysicalConversionR
117118
};
118119
vec![node]
119120
}
121+
OptRelNodeTyp::EmptyRelation => {
122+
let node = RelNode {
123+
typ: OptRelNodeTyp::PhysicalEmptyRelation,
124+
children,
125+
data,
126+
};
127+
vec![node]
128+
}
120129
_ => vec![],
121130
}
122131
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- (no id or description)
2+
create table t1(t1v1 int, t1v2 int);
3+
create table t2(t2v1 int, t2v3 int);
4+
insert into t1 values (0, 0), (1, 1), (2, 2);
5+
insert into t2 values (0, 200), (1, 201), (2, 202);
6+
7+
/*
8+
3
9+
3
10+
*/
11+
12+
-- Test whether the optimizer handles empty relation correctly.
13+
select 64 + 1;
14+
select 64 + 1 from t1;
15+
16+
/*
17+
65
18+
65
19+
65
20+
65
21+
*/
22+
23+
-- Test whether the optimizer eliminates join to empty relation
24+
select * from t1 inner join t2 on false;
25+
select 64+1 from t1 inner join t2 on false;
26+
select 64+1 from t1 inner join t2 on 1=0;
27+
28+
/*
29+
*/
30+

0 commit comments

Comments
 (0)