Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 81dfe73

Browse files
authored
[Logical Optimizer] align schema (#61)
cmu-db/optd#56 --------- Signed-off-by: AveryQi115 <[email protected]>
1 parent fbfd4db commit 81dfe73

File tree

4 files changed

+54
-27
lines changed

4 files changed

+54
-27
lines changed

Diff for: optd-datafusion-bridge/src/from_optd.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{physical_collector::CollectorExec, OptdPlanContext};
3636
// TODO: current DataType and ConstantType are not 1 to 1 mapping
3737
// optd schema stores constantType from data type in catalog.get
3838
// for decimal128, the precision is lost
39-
fn from_optd_schema(optd_schema: &OptdSchema) -> Schema {
39+
fn from_optd_schema(optd_schema: OptdSchema) -> Schema {
4040
let match_type = |typ: &ConstantType| match typ {
4141
ConstantType::Any => unimplemented!(),
4242
ConstantType::Bool => DataType::Boolean,
@@ -52,12 +52,14 @@ fn from_optd_schema(optd_schema: &OptdSchema) -> Schema {
5252
ConstantType::Decimal => DataType::Float64,
5353
ConstantType::Utf8String => DataType::Utf8,
5454
};
55-
let fields: Vec<_> = optd_schema
56-
.0
57-
.iter()
58-
.enumerate()
59-
.map(|(i, typ)| Field::new(format!("c{}", i), match_type(typ), false))
60-
.collect();
55+
let mut fields = Vec::with_capacity(optd_schema.len());
56+
for field in optd_schema.fields {
57+
fields.push(Field::new(
58+
field.name,
59+
match_type(&field.typ),
60+
field.nullable,
61+
));
62+
}
6163
Schema::new(fields)
6264
}
6365

@@ -437,7 +439,7 @@ impl OptdPlanContext<'_> {
437439

438440
#[async_recursion]
439441
async fn conv_from_optd_plan_node(&mut self, node: PlanNode) -> Result<Arc<dyn ExecutionPlan>> {
440-
let mut schema = OptdSchema(vec![]);
442+
let mut schema = OptdSchema { fields: vec![] };
441443
if node.typ() == OptRelNodeTyp::PhysicalEmptyRelation {
442444
schema = node.schema(self.optimizer.unwrap().optd_optimizer());
443445
}
@@ -485,7 +487,7 @@ impl OptdPlanContext<'_> {
485487
}
486488
OptRelNodeTyp::PhysicalEmptyRelation => {
487489
let physical_node = PhysicalEmptyRelation::from_rel_node(rel_node).unwrap();
488-
let datafusion_schema: Schema = from_optd_schema(&schema);
490+
let datafusion_schema: Schema = from_optd_schema(schema);
489491
Ok(Arc::new(datafusion::physical_plan::empty::EmptyExec::new(
490492
physical_node.produce_one_row(),
491493
Arc::new(datafusion_schema),

Diff for: optd-datafusion-bridge/src/lib.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ impl Catalog for DatafusionCatalog {
6161
let catalog = self.catalog.catalog("datafusion").unwrap();
6262
let schema = catalog.schema("public").unwrap();
6363
let table = futures_lite::future::block_on(schema.table(name.as_ref())).unwrap();
64-
let fields = table.schema();
65-
let mut optd_schema = vec![];
66-
for field in fields.fields() {
64+
let schema = table.schema();
65+
let fields = schema.fields();
66+
let mut optd_fields = Vec::with_capacity(fields.len());
67+
for field in fields {
6768
let dt = match field.data_type() {
6869
DataType::Date32 => ConstantType::Date,
6970
DataType::Int32 => ConstantType::Int32,
@@ -73,9 +74,15 @@ impl Catalog for DatafusionCatalog {
7374
DataType::Decimal128(_, _) => ConstantType::Decimal,
7475
dt => unimplemented!("{:?}", dt),
7576
};
76-
optd_schema.push(dt);
77+
optd_fields.push(optd_datafusion_repr::properties::schema::Field {
78+
name: field.name().to_string(),
79+
typ: dt,
80+
nullable: field.is_nullable(),
81+
});
82+
}
83+
optd_datafusion_repr::properties::schema::Schema {
84+
fields: optd_fields,
7785
}
78-
optd_datafusion_repr::properties::schema::Schema(optd_schema)
7986
}
8087
}
8188

Diff for: optd-datafusion-repr/src/properties/schema.rs

+26-6
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,19 @@ use optd_core::property::PropertyBuilder;
33
use crate::plan_nodes::{ConstantType, OptRelNodeTyp};
44

55
#[derive(Clone, Debug)]
6-
pub struct Schema(pub Vec<ConstantType>);
6+
pub struct Field {
7+
pub name: String,
8+
pub typ: ConstantType,
9+
pub nullable: bool,
10+
}
11+
#[derive(Clone, Debug)]
12+
pub struct Schema {
13+
pub fields: Vec<Field>,
14+
}
715

8-
// TODO: add names, nullable to schema
916
impl Schema {
1017
pub fn len(&self) -> usize {
11-
self.0.len()
18+
self.fields.len()
1219
}
1320

1421
pub fn is_empty(&self) -> bool {
@@ -48,11 +55,24 @@ impl PropertyBuilder<OptRelNodeTyp> for SchemaPropertyBuilder {
4855
OptRelNodeTyp::Filter => children[0].clone(),
4956
OptRelNodeTyp::Join(_) => {
5057
let mut schema = children[0].clone();
51-
schema.0.extend(children[1].clone().0);
58+
let schema2 = children[1].clone();
59+
schema.fields.extend(schema2.fields);
5260
schema
5361
}
54-
OptRelNodeTyp::List => Schema(vec![ConstantType::Any; children.len()]),
55-
_ => Schema(vec![]),
62+
OptRelNodeTyp::List => {
63+
// TODO: calculate real is_nullable for aggregations
64+
Schema {
65+
fields: vec![
66+
Field {
67+
name: "unnamed".to_string(),
68+
typ: ConstantType::Any,
69+
nullable: true
70+
};
71+
children.len()
72+
],
73+
}
74+
}
75+
_ => Schema { fields: vec![] },
5676
}
5777
}
5878

Diff for: optd-datafusion-repr/src/rules/joins.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ fn apply_join_commute(
6868
cond,
6969
JoinType::Inner,
7070
);
71-
let mut proj_expr = Vec::with_capacity(left_schema.0.len() + right_schema.0.len());
71+
let mut proj_expr = Vec::with_capacity(left_schema.len() + right_schema.len());
7272
for i in 0..left_schema.len() {
7373
proj_expr.push(ColumnRefExpr::new(right_schema.len() + i).into_expr());
7474
}
@@ -218,21 +218,19 @@ fn apply_hash_join(
218218
let Some(mut right_expr) = ColumnRefExpr::from_rel_node(right_expr.into_rel_node()) else {
219219
return vec![];
220220
};
221-
let can_convert = if left_expr.index() < left_schema.0.len()
222-
&& right_expr.index() >= left_schema.0.len()
221+
let can_convert = if left_expr.index() < left_schema.len()
222+
&& right_expr.index() >= left_schema.len()
223223
{
224224
true
225-
} else if right_expr.index() < left_schema.0.len()
226-
&& left_expr.index() >= left_schema.0.len()
227-
{
225+
} else if right_expr.index() < left_schema.len() && left_expr.index() >= left_schema.len() {
228226
(left_expr, right_expr) = (right_expr, left_expr);
229227
true
230228
} else {
231229
false
232230
};
233231

234232
if can_convert {
235-
let right_expr = ColumnRefExpr::new(right_expr.index() - left_schema.0.len());
233+
let right_expr = ColumnRefExpr::new(right_expr.index() - left_schema.len());
236234
let node = PhysicalHashJoin::new(
237235
PlanNode::from_group(left.into()),
238236
PlanNode::from_group(right.into()),

0 commit comments

Comments
 (0)