Skip to content

Commit 4a3f70c

Browse files
authored
Add FileScanConfigBuilder (#15352)
* WIP: Add `FileScanConfigBuilder` and switch some cases * Fmt * More fmt * Clean `FileScanConfig::build` * Clean `FileScanConfig::new` * Fix csv + fmt * More fixes * Remove pub * Remove todo * Add usage example * Fix input type * Add `from_data_source` * Fmt * Add docs for `with_source`
1 parent cd96b26 commit 4a3f70c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+912
-303
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use datafusion::common::{
3030
use datafusion::datasource::listing::PartitionedFile;
3131
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
3232
use datafusion::datasource::physical_plan::{
33-
FileMeta, FileScanConfig, ParquetFileReaderFactory, ParquetSource,
33+
FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
3434
};
3535
use datafusion::datasource::TableProvider;
3636
use datafusion::execution::object_store::ObjectStoreUrl;
@@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
5555
use arrow::datatypes::SchemaRef;
5656
use async_trait::async_trait;
5757
use bytes::Bytes;
58+
use datafusion::datasource::memory::DataSourceExec;
5859
use futures::future::BoxFuture;
5960
use futures::FutureExt;
6061
use object_store::ObjectStore;
@@ -498,13 +499,15 @@ impl TableProvider for IndexTableProvider {
498499
// provide the factory to create parquet reader without re-reading metadata
499500
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
500501
);
501-
let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source)
502-
.with_limit(limit)
503-
.with_projection(projection.cloned())
504-
.with_file(partitioned_file);
502+
let file_scan_config =
503+
FileScanConfigBuilder::new(object_store_url, schema, file_source)
504+
.with_limit(limit)
505+
.with_projection(projection.cloned())
506+
.with_file(partitioned_file)
507+
.build();
505508

506509
// Finally, put it all together into a DataSourceExec
507-
Ok(file_scan_config.build())
510+
Ok(DataSourceExec::from_data_source(file_scan_config))
508511
}
509512

510513
/// Tell DataFusion to push filters down to the scan method

datafusion-examples/examples/csv_json_opener.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ use datafusion::{
2424
file_format::file_compression_type::FileCompressionType,
2525
listing::PartitionedFile,
2626
object_store::ObjectStoreUrl,
27-
physical_plan::{
28-
CsvSource, FileScanConfig, FileSource, FileStream, JsonOpener, JsonSource,
29-
},
27+
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
3028
},
3129
error::Result,
3230
physical_plan::metrics::ExecutionPlanMetricsSet,
3331
test_util::aggr_test_schema,
3432
};
3533

34+
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
3635
use futures::StreamExt;
3736
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
3837

@@ -56,14 +55,15 @@ async fn csv_opener() -> Result<()> {
5655

5756
let path = std::path::Path::new(&path).canonicalize()?;
5857

59-
let scan_config = FileScanConfig::new(
58+
let scan_config = FileScanConfigBuilder::new(
6059
ObjectStoreUrl::local_filesystem(),
6160
Arc::clone(&schema),
6261
Arc::new(CsvSource::default()),
6362
)
6463
.with_projection(Some(vec![12, 0]))
6564
.with_limit(Some(5))
66-
.with_file(PartitionedFile::new(path.display().to_string(), 10));
65+
.with_file(PartitionedFile::new(path.display().to_string(), 10))
66+
.build();
6767

6868
let config = CsvSource::new(true, b',', b'"')
6969
.with_comment(Some(b'#'))
@@ -121,14 +121,15 @@ async fn json_opener() -> Result<()> {
121121
Arc::new(object_store),
122122
);
123123

124-
let scan_config = FileScanConfig::new(
124+
let scan_config = FileScanConfigBuilder::new(
125125
ObjectStoreUrl::local_filesystem(),
126126
schema,
127127
Arc::new(JsonSource::default()),
128128
)
129129
.with_projection(Some(vec![1, 0]))
130130
.with_limit(Some(5))
131-
.with_file(PartitionedFile::new(path.to_string(), 10));
131+
.with_file(PartitionedFile::new(path.to_string(), 10))
132+
.build();
132133

133134
let mut stream = FileStream::new(
134135
&scan_config,

datafusion-examples/examples/parquet_index.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use datafusion::common::{
2727
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
2828
};
2929
use datafusion::datasource::listing::PartitionedFile;
30-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
30+
use datafusion::datasource::memory::DataSourceExec;
31+
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
3132
use datafusion::datasource::TableProvider;
3233
use datafusion::execution::object_store::ObjectStoreUrl;
3334
use datafusion::logical_expr::{
@@ -243,8 +244,8 @@ impl TableProvider for IndexTableProvider {
243244
let object_store_url = ObjectStoreUrl::parse("file://")?;
244245
let source =
245246
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
246-
let mut file_scan_config =
247-
FileScanConfig::new(object_store_url, self.schema(), source)
247+
let mut file_scan_config_builder =
248+
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
248249
.with_projection(projection.cloned())
249250
.with_limit(limit);
250251

@@ -253,12 +254,13 @@ impl TableProvider for IndexTableProvider {
253254
for (file_name, file_size) in files {
254255
let path = self.dir.join(file_name);
255256
let canonical_path = fs::canonicalize(path)?;
256-
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
257-
canonical_path.display().to_string(),
258-
file_size,
259-
));
257+
file_scan_config_builder = file_scan_config_builder.with_file(
258+
PartitionedFile::new(canonical_path.display().to_string(), file_size),
259+
);
260260
}
261-
Ok(file_scan_config.build())
261+
Ok(DataSourceExec::from_data_source(
262+
file_scan_config_builder.build(),
263+
))
262264
}
263265

264266
/// Tell DataFusion to push filters down to the scan method

datafusion/core/src/datasource/file_format/arrow.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion_common::{
4949
use datafusion_common_runtime::{JoinSet, SpawnedTask};
5050
use datafusion_datasource::display::FileGroupDisplay;
5151
use datafusion_datasource::file::FileSource;
52-
use datafusion_datasource::file_scan_config::FileScanConfig;
52+
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
5353
use datafusion_datasource::sink::{DataSink, DataSinkExec};
5454
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5555
use datafusion_expr::dml::InsertOp;
@@ -58,6 +58,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
5858

5959
use async_trait::async_trait;
6060
use bytes::Bytes;
61+
use datafusion_datasource::source::DataSourceExec;
6162
use futures::stream::BoxStream;
6263
use futures::StreamExt;
6364
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
@@ -173,7 +174,12 @@ impl FileFormat for ArrowFormat {
173174
conf: FileScanConfig,
174175
_filters: Option<&Arc<dyn PhysicalExpr>>,
175176
) -> Result<Arc<dyn ExecutionPlan>> {
176-
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
177+
let source = Arc::new(ArrowSource::default());
178+
let config = FileScanConfigBuilder::from(conf)
179+
.with_source(source)
180+
.build();
181+
182+
Ok(DataSourceExec::from_data_source(config))
177183
}
178184

179185
async fn create_writer_physical_plan(

datafusion/core/src/datasource/file_format/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ pub(crate) mod test_util {
4040

4141
use datafusion_catalog::Session;
4242
use datafusion_common::Result;
43-
use datafusion_datasource::{
44-
file_format::FileFormat, file_scan_config::FileScanConfig, PartitionedFile,
45-
};
43+
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
44+
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
4645
use datafusion_execution::object_store::ObjectStoreUrl;
4746

4847
use crate::test::object_store::local_unpartitioned_file;
@@ -79,15 +78,16 @@ pub(crate) mod test_util {
7978
let exec = format
8079
.create_physical_plan(
8180
state,
82-
FileScanConfig::new(
81+
FileScanConfigBuilder::new(
8382
ObjectStoreUrl::local_filesystem(),
8483
file_schema,
8584
format.file_source(),
8685
)
8786
.with_file_groups(file_groups)
8887
.with_statistics(statistics)
8988
.with_projection(projection)
90-
.with_limit(limit),
89+
.with_limit(limit)
90+
.build(),
9191
None,
9292
)
9393
.await?;

datafusion/core/src/datasource/listing/table.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::datasource::{
3434
use crate::execution::context::SessionState;
3535
use datafusion_catalog::TableProvider;
3636
use datafusion_common::{config_err, DataFusionError, Result};
37-
use datafusion_datasource::file_scan_config::FileScanConfig;
37+
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3838
use datafusion_expr::dml::InsertOp;
3939
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
4040
use datafusion_expr::{SortExpr, TableType};
@@ -942,7 +942,7 @@ impl TableProvider for ListingTable {
942942
.format
943943
.create_physical_plan(
944944
session_state,
945-
FileScanConfig::new(
945+
FileScanConfigBuilder::new(
946946
object_store_url,
947947
Arc::clone(&self.file_schema),
948948
self.options.format.file_source(),
@@ -953,7 +953,8 @@ impl TableProvider for ListingTable {
953953
.with_projection(projection.cloned())
954954
.with_limit(limit)
955955
.with_output_ordering(output_ordering)
956-
.with_table_partition_cols(table_partition_cols),
956+
.with_table_partition_cols(table_partition_cols)
957+
.build(),
957958
filters.as_ref(),
958959
)
959960
.await

datafusion/core/src/datasource/memory.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl TableProvider for MemTable {
251251
source = source.try_with_sort_information(file_sort_order)?;
252252
}
253253

254-
Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
254+
Ok(DataSourceExec::from_data_source(source))
255255
}
256256

257257
/// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].

datafusion/core/src/datasource/mod.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ mod tests {
6161
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
6262
use arrow::record_batch::RecordBatch;
6363
use datafusion_common::test_util::batches_to_sort_string;
64-
use datafusion_datasource::file_scan_config::FileScanConfig;
64+
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
6565
use datafusion_datasource::schema_adapter::{
6666
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
6767
};
@@ -72,6 +72,7 @@ mod tests {
7272

7373
use ::object_store::path::Path;
7474
use ::object_store::ObjectMeta;
75+
use datafusion_datasource::source::DataSourceExec;
7576
use datafusion_physical_plan::collect;
7677
use tempfile::TempDir;
7778

@@ -128,11 +129,15 @@ mod tests {
128129
ParquetSource::default()
129130
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})),
130131
);
131-
let base_conf =
132-
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
133-
.with_file(partitioned_file);
132+
let base_conf = FileScanConfigBuilder::new(
133+
ObjectStoreUrl::local_filesystem(),
134+
schema,
135+
source,
136+
)
137+
.with_file(partitioned_file)
138+
.build();
134139

135-
let parquet_exec = base_conf.build();
140+
let parquet_exec = DataSourceExec::from_data_source(base_conf);
136141

137142
let session_ctx = SessionContext::new();
138143
let task_ctx = session_ctx.task_ctx();

datafusion/core/src/datasource/physical_plan/avro.rs

+20-13
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ mod tests {
3333
use datafusion_common::test_util::batches_to_string;
3434
use datafusion_common::{test_util, Result, ScalarValue};
3535
use datafusion_datasource::file_format::FileFormat;
36-
use datafusion_datasource::file_scan_config::FileScanConfig;
36+
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3737
use datafusion_datasource::PartitionedFile;
3838
use datafusion_datasource_avro::source::AvroSource;
3939
use datafusion_datasource_avro::AvroFormat;
4040
use datafusion_execution::object_store::ObjectStoreUrl;
4141
use datafusion_physical_plan::ExecutionPlan;
4242

43+
use datafusion_datasource::source::DataSourceExec;
4344
use futures::StreamExt;
4445
use insta::assert_snapshot;
4546
use object_store::chunked::ChunkedStore;
@@ -81,12 +82,16 @@ mod tests {
8182
.await?;
8283

8384
let source = Arc::new(AvroSource::new());
84-
let conf =
85-
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source)
86-
.with_file(meta.into())
87-
.with_projection(Some(vec![0, 1, 2]));
88-
89-
let source_exec = conf.build();
85+
let conf = FileScanConfigBuilder::new(
86+
ObjectStoreUrl::local_filesystem(),
87+
file_schema,
88+
source,
89+
)
90+
.with_file(meta.into())
91+
.with_projection(Some(vec![0, 1, 2]))
92+
.build();
93+
94+
let source_exec = DataSourceExec::from_data_source(conf);
9095
assert_eq!(
9196
source_exec
9297
.properties()
@@ -153,11 +158,12 @@ mod tests {
153158
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
154159

155160
let source = Arc::new(AvroSource::new());
156-
let conf = FileScanConfig::new(object_store_url, file_schema, source)
161+
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
157162
.with_file(meta.into())
158-
.with_projection(projection);
163+
.with_projection(projection)
164+
.build();
159165

160-
let source_exec = conf.build();
166+
let source_exec = DataSourceExec::from_data_source(conf);
161167
assert_eq!(
162168
source_exec
163169
.properties()
@@ -222,14 +228,15 @@ mod tests {
222228

223229
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
224230
let source = Arc::new(AvroSource::new());
225-
let conf = FileScanConfig::new(object_store_url, file_schema, source)
231+
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
226232
// select specific columns of the files as well as the partitioning
227233
// column which is supposed to be the last column in the table schema.
228234
.with_projection(projection)
229235
.with_file(partitioned_file)
230-
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]);
236+
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
237+
.build();
231238

232-
let source_exec = conf.build();
239+
let source_exec = DataSourceExec::from_data_source(conf);
233240

234241
assert_eq!(
235242
source_exec

0 commit comments

Comments
 (0)