Skip to content

Commit fdb4e84

Browse files
authored
Add downcast_to_source method for DataSourceExec (#15416)
* Add downcast_to_source method for DataSourceExec * rename * fix conflicts * fix cippy * add the change to upgrading doc * prettier * remove * address comments
1 parent 3af31ca commit fdb4e84

File tree

7 files changed

+141
-162
lines changed

7 files changed

+141
-162
lines changed

datafusion-examples/examples/parquet_exec_visitor.rs

+9-17
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion::datasource::file_format::parquet::ParquetFormat;
2121
use datafusion::datasource::listing::ListingOptions;
22-
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, ParquetSource};
22+
use datafusion::datasource::physical_plan::{FileGroup, ParquetSource};
2323
use datafusion::datasource::source::DataSourceExec;
2424
use datafusion::error::DataFusionError;
2525
use datafusion::execution::context::SessionContext;
@@ -98,24 +98,16 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
9898
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
9999
// If needed match on a specific `ExecutionPlan` node type
100100
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
101-
let data_source = data_source_exec.data_source();
102-
if let Some(file_config) =
103-
data_source.as_any().downcast_ref::<FileScanConfig>()
101+
if let Some((file_config, _)) =
102+
data_source_exec.downcast_to_file_source::<ParquetSource>()
104103
{
105-
if file_config
106-
.file_source()
107-
.as_any()
108-
.downcast_ref::<ParquetSource>()
109-
.is_some()
110-
{
111-
self.file_groups = Some(file_config.file_groups.clone());
104+
self.file_groups = Some(file_config.file_groups.clone());
112105

113-
let metrics = match data_source_exec.metrics() {
114-
None => return Ok(true),
115-
Some(metrics) => metrics,
116-
};
117-
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
118-
}
106+
let metrics = match data_source_exec.metrics() {
107+
None => return Ok(true),
108+
Some(metrics) => metrics,
109+
};
110+
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
119111
}
120112
}
121113
Ok(true)

datafusion/core/src/test_util/parquet.rs

+4-11
Original file line numberDiff line numberDiff line change
@@ -201,18 +201,11 @@ impl TestParquetFile {
201201
/// on the first one it finds
202202
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
203203
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
204-
let data_source = data_source_exec.data_source();
205-
if let Some(maybe_parquet) =
206-
data_source.as_any().downcast_ref::<FileScanConfig>()
204+
if data_source_exec
205+
.downcast_to_file_source::<ParquetSource>()
206+
.is_some()
207207
{
208-
if maybe_parquet
209-
.file_source()
210-
.as_any()
211-
.downcast_ref::<ParquetSource>()
212-
.is_some()
213-
{
214-
return data_source_exec.metrics();
215-
}
208+
return data_source_exec.metrics();
216209
}
217210
}
218211

datafusion/core/tests/parquet/utils.rs

+5-12
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Utilities for parquet tests
1919
20-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
20+
use datafusion::datasource::physical_plan::ParquetSource;
2121
use datafusion::datasource::source::DataSourceExec;
2222
use datafusion_physical_plan::metrics::MetricsSet;
2323
use datafusion_physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
@@ -48,18 +48,11 @@ impl ExecutionPlanVisitor for MetricsFinder {
4848
type Error = std::convert::Infallible;
4949
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
5050
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
51-
let data_source = data_source_exec.data_source();
52-
if let Some(file_config) =
53-
data_source.as_any().downcast_ref::<FileScanConfig>()
51+
if data_source_exec
52+
.downcast_to_file_source::<ParquetSource>()
53+
.is_some()
5454
{
55-
if file_config
56-
.file_source()
57-
.as_any()
58-
.downcast_ref::<ParquetSource>()
59-
.is_some()
60-
{
61-
self.metrics = data_source_exec.metrics();
62-
}
55+
self.metrics = data_source_exec.metrics();
6356
}
6457
}
6558
// stop searching once we have found the metrics

datafusion/core/tests/sql/path_partition.rs

+17-24
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525

2626
use arrow::datatypes::DataType;
2727
use datafusion::datasource::listing::ListingTableUrl;
28-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
28+
use datafusion::datasource::physical_plan::ParquetSource;
2929
use datafusion::datasource::source::DataSourceExec;
3030
use datafusion::{
3131
datasource::{
@@ -87,29 +87,22 @@ async fn parquet_partition_pruning_filter() -> Result<()> {
8787
];
8888
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
8989
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
90-
let data_source = data_source_exec.data_source();
91-
let file_source = data_source
92-
.as_any()
93-
.downcast_ref::<FileScanConfig>()
94-
.unwrap();
95-
let parquet_config = file_source
96-
.file_source()
97-
.as_any()
98-
.downcast_ref::<ParquetSource>()
99-
.unwrap();
100-
let pred = parquet_config.predicate().unwrap();
101-
// Only the last filter should be pushdown to TableScan
102-
let expected = Arc::new(BinaryExpr::new(
103-
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
104-
Operator::Gt,
105-
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
106-
));
107-
108-
assert!(pred.as_any().is::<BinaryExpr>());
109-
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();
110-
111-
assert_eq!(pred, expected.as_ref());
112-
90+
if let Some((_, parquet_config)) =
91+
data_source_exec.downcast_to_file_source::<ParquetSource>()
92+
{
93+
let pred = parquet_config.predicate().unwrap();
94+
// Only the last filter should be pushdown to TableScan
95+
let expected = Arc::new(BinaryExpr::new(
96+
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
97+
Operator::Gt,
98+
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
99+
));
100+
101+
assert!(pred.as_any().is::<BinaryExpr>());
102+
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();
103+
104+
assert_eq!(pred, expected.as_ref());
105+
}
113106
Ok(())
114107
}
115108

datafusion/datasource/src/source.rs

+19
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion_physical_plan::{
2929
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
3030
};
3131

32+
use crate::file_scan_config::FileScanConfig;
3233
use datafusion_common::config::ConfigOptions;
3334
use datafusion_common::{Constraints, Statistics};
3435
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -230,4 +231,22 @@ impl DataSourceExec {
230231
Boundedness::Bounded,
231232
)
232233
}
234+
235+
/// Downcast the `DataSourceExec`'s `data_source` to a specific file source
236+
///
237+
/// Returns `None` if
238+
/// 1. the datasource is not scanning files (`FileScanConfig`)
239+
/// 2. The [`FileScanConfig::file_source`] is not of type `T`
240+
pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
241+
self.data_source()
242+
.as_any()
243+
.downcast_ref::<FileScanConfig>()
244+
.and_then(|file_scan_conf| {
245+
file_scan_conf
246+
.file_source()
247+
.as_any()
248+
.downcast_ref::<T>()
249+
.map(|source| (file_scan_conf, source))
250+
})
251+
}
233252
}

datafusion/proto/src/physical_plan/mod.rs

+20-24
Original file line numberDiff line numberDiff line change
@@ -1714,31 +1714,27 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
17141714

17151715
#[cfg(feature = "parquet")]
17161716
if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
1717-
let data_source_exec = exec.data_source();
1718-
if let Some(maybe_parquet) =
1719-
data_source_exec.as_any().downcast_ref::<FileScanConfig>()
1717+
if let Some((maybe_parquet, conf)) =
1718+
exec.downcast_to_file_source::<ParquetSource>()
17201719
{
1721-
let source = maybe_parquet.file_source();
1722-
if let Some(conf) = source.as_any().downcast_ref::<ParquetSource>() {
1723-
let predicate = conf
1724-
.predicate()
1725-
.map(|pred| serialize_physical_expr(pred, extension_codec))
1726-
.transpose()?;
1727-
return Ok(protobuf::PhysicalPlanNode {
1728-
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
1729-
protobuf::ParquetScanExecNode {
1730-
base_conf: Some(serialize_file_scan_config(
1731-
maybe_parquet,
1732-
extension_codec,
1733-
)?),
1734-
predicate,
1735-
parquet_options: Some(
1736-
conf.table_parquet_options().try_into()?,
1737-
),
1738-
},
1739-
)),
1740-
});
1741-
}
1720+
let predicate = conf
1721+
.predicate()
1722+
.map(|pred| serialize_physical_expr(pred, extension_codec))
1723+
.transpose()?;
1724+
return Ok(protobuf::PhysicalPlanNode {
1725+
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
1726+
protobuf::ParquetScanExecNode {
1727+
base_conf: Some(serialize_file_scan_config(
1728+
maybe_parquet,
1729+
extension_codec,
1730+
)?),
1731+
predicate,
1732+
parquet_options: Some(
1733+
conf.table_parquet_options().try_into()?,
1734+
),
1735+
},
1736+
)),
1737+
});
17421738
}
17431739
}
17441740

datafusion/substrait/src/physical_plan/producer.rs

+67-74
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::datasource::source::DataSourceExec;
2727
use datafusion::error::{DataFusionError, Result};
2828
use datafusion::physical_plan::{displayable, ExecutionPlan};
2929

30-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
30+
use datafusion::datasource::physical_plan::ParquetSource;
3131
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
3232
use substrait::proto::expression::MaskExpression;
3333
use substrait::proto::r#type::{
@@ -52,89 +52,82 @@ pub fn to_substrait_rel(
5252
),
5353
) -> Result<Box<Rel>> {
5454
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
55-
let data_source = data_source_exec.data_source();
56-
if let Some(file_config) = data_source.as_any().downcast_ref::<FileScanConfig>() {
57-
let is_parquet = file_config
58-
.file_source()
59-
.as_any()
60-
.downcast_ref::<ParquetSource>()
61-
.is_some();
62-
if is_parquet {
63-
let mut substrait_files = vec![];
64-
for (partition_index, files) in file_config.file_groups.iter().enumerate()
65-
{
66-
for file in files.iter() {
67-
substrait_files.push(FileOrFiles {
68-
partition_index: partition_index.try_into().unwrap(),
69-
start: 0,
70-
length: file.object_meta.size as u64,
71-
path_type: Some(PathType::UriPath(
72-
file.object_meta.location.as_ref().to_string(),
73-
)),
74-
file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
75-
});
76-
}
55+
if let Some((file_config, _)) =
56+
data_source_exec.downcast_to_file_source::<ParquetSource>()
57+
{
58+
let mut substrait_files = vec![];
59+
for (partition_index, files) in file_config.file_groups.iter().enumerate() {
60+
for file in files.iter() {
61+
substrait_files.push(FileOrFiles {
62+
partition_index: partition_index.try_into().unwrap(),
63+
start: 0,
64+
length: file.object_meta.size as u64,
65+
path_type: Some(PathType::UriPath(
66+
file.object_meta.location.as_ref().to_string(),
67+
)),
68+
file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
69+
});
7770
}
71+
}
7872

79-
let mut names = vec![];
80-
let mut types = vec![];
73+
let mut names = vec![];
74+
let mut types = vec![];
8175

82-
for field in file_config.file_schema.fields.iter() {
83-
match to_substrait_type(field.data_type(), field.is_nullable()) {
84-
Ok(t) => {
85-
names.push(field.name().clone());
86-
types.push(t);
87-
}
88-
Err(e) => return Err(e),
76+
for field in file_config.file_schema.fields.iter() {
77+
match to_substrait_type(field.data_type(), field.is_nullable()) {
78+
Ok(t) => {
79+
names.push(field.name().clone());
80+
types.push(t);
8981
}
82+
Err(e) => return Err(e),
9083
}
84+
}
9185

92-
let type_info = Struct {
93-
types,
94-
// FIXME: duckdb doesn't set this field, keep it as default variant 0.
95-
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
96-
type_variation_reference: 0,
97-
nullability: Nullability::Required.into(),
98-
};
86+
let type_info = Struct {
87+
types,
88+
// FIXME: duckdb doesn't set this field, keep it as default variant 0.
89+
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
90+
type_variation_reference: 0,
91+
nullability: Nullability::Required.into(),
92+
};
9993

100-
let mut select_struct = None;
101-
if let Some(projection) = file_config.projection.as_ref() {
102-
let struct_items = projection
103-
.iter()
104-
.map(|index| StructItem {
105-
field: *index as i32,
106-
// FIXME: duckdb sets this to None, but it's not clear why.
107-
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
108-
child: None,
109-
})
110-
.collect();
94+
let mut select_struct = None;
95+
if let Some(projection) = file_config.projection.as_ref() {
96+
let struct_items = projection
97+
.iter()
98+
.map(|index| StructItem {
99+
field: *index as i32,
100+
// FIXME: duckdb sets this to None, but it's not clear why.
101+
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
102+
child: None,
103+
})
104+
.collect();
111105

112-
select_struct = Some(StructSelect { struct_items });
113-
}
106+
select_struct = Some(StructSelect { struct_items });
107+
}
114108

115-
return Ok(Box::new(Rel {
116-
rel_type: Some(RelType::Read(Box::new(ReadRel {
117-
common: None,
118-
base_schema: Some(NamedStruct {
119-
names,
120-
r#struct: Some(type_info),
121-
}),
122-
filter: None,
123-
best_effort_filter: None,
124-
projection: Some(MaskExpression {
125-
select: select_struct,
126-
// FIXME: duckdb set this to true, but it's not clear why.
127-
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
128-
maintain_singular_struct: true,
129-
}),
109+
return Ok(Box::new(Rel {
110+
rel_type: Some(RelType::Read(Box::new(ReadRel {
111+
common: None,
112+
base_schema: Some(NamedStruct {
113+
names,
114+
r#struct: Some(type_info),
115+
}),
116+
filter: None,
117+
best_effort_filter: None,
118+
projection: Some(MaskExpression {
119+
select: select_struct,
120+
// FIXME: duckdb set this to true, but it's not clear why.
121+
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
122+
maintain_singular_struct: true,
123+
}),
124+
advanced_extension: None,
125+
read_type: Some(ReadType::LocalFiles(LocalFiles {
126+
items: substrait_files,
130127
advanced_extension: None,
131-
read_type: Some(ReadType::LocalFiles(LocalFiles {
132-
items: substrait_files,
133-
advanced_extension: None,
134-
})),
135-
}))),
136-
}));
137-
}
128+
})),
129+
}))),
130+
}));
138131
}
139132
}
140133
Err(DataFusionError::Substrait(format!(

0 commit comments

Comments
 (0)