Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 4f313ba

Browse files
authored
feat(bridge): cross join support (#46)
* feat: support cross join Signed-off-by: Yuchen Liang <[email protected]> * feat: disable datafusion optimizer Signed-off-by: Yuchen Liang <[email protected]> * fix Signed-off-by: Yuchen Liang <[email protected]> * add implementation rule and use CrossExec Signed-off-by: Yuchen Liang <[email protected]> --------- Signed-off-by: Yuchen Liang <[email protected]>
1 parent 5e067be commit 4f313ba

File tree

6 files changed

+44
-15
lines changed

6 files changed

+44
-15
lines changed

datafusion-optd-cli/src/main.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ struct Args {
134134
default_value = "40"
135135
)]
136136
maxrows: MaxRows,
137+
#[clap(long, help = "Turn on datafusion logical optimizer before optd")]
138+
enable_logical: bool,
137139
}
138140

139141
#[tokio::main]
@@ -156,6 +158,11 @@ pub async fn main() -> Result<()> {
156158
};
157159

158160
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
161+
162+
if !args.enable_logical {
163+
session_config.options_mut().optimizer.max_passes = 0;
164+
}
165+
159166

160167
if let Some(batch_size) = args.batch_size {
161168
session_config = session_config.with_batch_size(batch_size);
@@ -187,9 +194,11 @@ pub async fn main() -> Result<()> {
187194
let mut ctx = {
188195
let mut state =
189196
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
190-
// clean up optimizer rules so that we can plug in our own optimizer
191-
// state = state.with_optimizer_rules(vec![]);
192-
// state = state.with_physical_optimizer_rules(vec![]);
197+
if !args.enable_logical {
198+
// clean up optimizer rules so that we can plug in our own optimizer
199+
state = state.with_optimizer_rules(vec![]);
200+
state = state.with_physical_optimizer_rules(vec![]);
201+
}
193202
// use optd-bridge query planner
194203
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
195204
state.catalog_list(),

optd-adaptive-demo/src/bin/optd-adaptive-tpch-q8.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ static GLOBAL: MiMalloc = MiMalloc;
1919

2020
#[tokio::main]
2121
async fn main() -> Result<()> {
22-
let session_config = SessionConfig::from_env()?.with_information_schema(true);
22+
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
23+
session_config.options_mut().optimizer.max_passes = 0;
2324

2425
let rn_config = RuntimeConfig::new();
2526
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
@@ -30,6 +31,10 @@ async fn main() -> Result<()> {
3031
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
3132
state.catalog_list(),
3233
)));
34+
// clean up optimizer rules so that we can plug in our own optimizer
35+
state = state.with_optimizer_rules(vec![]);
36+
state = state.with_physical_optimizer_rules(vec![]);
37+
// use optd-bridge query planner
3338
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
3439
SessionContext::new_with_state(state)
3540
};

optd-datafusion-bridge/src/from_optd.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,16 @@ use std::{collections::HashMap, sync::Arc};
33
use anyhow::{bail, Context, Result};
44
use async_recursion::async_recursion;
55
use datafusion::{
6-
arrow::{
7-
compute::kernels::filter,
8-
datatypes::{Schema, SchemaRef},
9-
},
6+
arrow::datatypes::{Schema, SchemaRef},
107
datasource::source_as_provider,
118
logical_expr::Operator,
129
physical_expr,
1310
physical_plan::{
1411
self,
1512
aggregates::AggregateMode,
16-
explain::ExplainExec,
1713
expressions::create_aggregate_expr,
1814
joins::{
19-
utils::{ColumnIndex, JoinFilter},
20-
PartitionMode,
15+
utils::{ColumnIndex, JoinFilter}, CrossJoinExec, PartitionMode
2116
},
2217
projection::ProjectionExec,
2318
AggregateExpr, ExecutionPlan, PhysicalExpr,
@@ -31,7 +26,7 @@ use optd_datafusion_repr::{
3126
PhysicalFilter, PhysicalHashJoin, PhysicalNestedLoopJoin, PhysicalProjection, PhysicalScan,
3227
PhysicalSort, PlanNode, SortOrderExpr, SortOrderType,
3328
},
34-
PhysicalCollector, Value,
29+
PhysicalCollector,
3530
};
3631

3732
use crate::{physical_collector::CollectorExec, OptdPlanContext};
@@ -320,6 +315,11 @@ impl OptdPlanContext<'_> {
320315
};
321316

322317
let physical_expr = self.from_optd_expr(node.cond(), &Arc::new(filter_schema.clone()))?;
318+
319+
if let JoinType::Cross = node.join_type() {
320+
return Ok(Arc::new(CrossJoinExec::new(left_exec, right_exec)) as Arc<dyn ExecutionPlan + 'static>);
321+
}
322+
323323
let join_type = match node.join_type() {
324324
JoinType::Inner => datafusion::logical_expr::JoinType::Inner,
325325
JoinType::LeftOuter => datafusion::logical_expr::JoinType::Left,

optd-datafusion-bridge/src/into_optd.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ impl OptdPlanContext<'_> {
113113
expr,
114114
)
115115
.into_expr())
116-
}
117-
_ => bail!("{:?}", expr),
116+
}
117+
_ => bail!("Unsupported expression: {:?}", expr),
118118
}
119119
}
120120

@@ -230,6 +230,12 @@ impl OptdPlanContext<'_> {
230230
}
231231
}
232232

233+
fn into_optd_cross_join(&mut self, node: &logical_plan::CrossJoin) -> Result<LogicalJoin> {
234+
let left = self.into_optd_plan_node(node.left.as_ref())?;
235+
let right = self.into_optd_plan_node(node.right.as_ref())?;
236+
Ok(LogicalJoin::new(left, right, ConstantExpr::bool(true).into_expr(), JoinType::Cross))
237+
}
238+
233239
fn into_optd_plan_node(&mut self, node: &LogicalPlan) -> Result<PlanNode> {
234240
let node = match node {
235241
LogicalPlan::TableScan(node) => self.into_optd_table_scan(node)?.into_plan_node(),
@@ -239,6 +245,7 @@ impl OptdPlanContext<'_> {
239245
LogicalPlan::SubqueryAlias(node) => self.into_optd_plan_node(node.input.as_ref())?,
240246
LogicalPlan::Join(node) => self.into_optd_join(node)?.into_plan_node(),
241247
LogicalPlan::Filter(node) => self.into_optd_filter(node)?.into_plan_node(),
248+
LogicalPlan::CrossJoin(node) => self.into_optd_cross_join(node)?.into_plan_node(),
242249
_ => bail!(
243250
"unsupported plan node: {}",
244251
format!("{:?}", node).split('\n').next().unwrap()

optd-datafusion-repr/src/rules/physical.rs

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ impl PhysicalConversionRule {
3434
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Join(
3535
JoinType::LeftOuter,
3636
))),
37+
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Join(
38+
JoinType::Cross,
39+
))),
3740
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Filter)),
3841
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Sort)),
3942
Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Agg)),

optd-sqlplannertest/src/lib.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ pub struct DatafusionDb {
2424

2525
impl DatafusionDb {
2626
pub async fn new() -> Result<Self> {
27-
let session_config = SessionConfig::from_env()?.with_information_schema(true);
27+
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
28+
session_config.options_mut().optimizer.max_passes = 0;
2829

2930
let rn_config = RuntimeConfig::new();
3031
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
@@ -35,6 +36,10 @@ impl DatafusionDb {
3536
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
3637
state.catalog_list(),
3738
)));
39+
// clean up optimizer rules so that we can plug in our own optimizer
40+
state = state.with_optimizer_rules(vec![]);
41+
state = state.with_physical_optimizer_rules(vec![]);
42+
// use optd-bridge query planner
3843
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
3944
SessionContext::new_with_state(state)
4045
};

0 commit comments

Comments
 (0)