Skip to content

Commit fe8ab01

Browse files
logan-keedealamb
andauthored
make datafusion-catalog-listing and move some implementation of listing out of datafusion/core/datasource/listing (#14464)
* make datafusion_catalog_listing * fix: this is a bit hacky * fixes: prettier, taplo etc * fixes: clippy * minor: permalink commit hash -> main * Tweak README * fix:prettier + wasm * prettier * Put unit tests with code --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 22a2061 commit fe8ab01

File tree

14 files changed

+518
-294
lines changed

14 files changed

+518
-294
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ members = [
2222
"datafusion/common",
2323
"datafusion/common-runtime",
2424
"datafusion/catalog",
25+
"datafusion/catalog-listing",
2526
"datafusion/core",
2627
"datafusion/expr",
2728
"datafusion/expr-common",
@@ -100,6 +101,7 @@ ctor = "0.2.9"
100101
dashmap = "6.0.1"
101102
datafusion = { path = "datafusion/core", version = "45.0.0", default-features = false }
102103
datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" }
104+
datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "45.0.0" }
103105
datafusion-common = { path = "datafusion/common", version = "45.0.0", default-features = false }
104106
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "45.0.0" }
105107
datafusion-doc = { path = "datafusion/doc", version = "45.0.0" }

datafusion-cli/Cargo.lock

+23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/Cargo.toml

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-catalog-listing"
20+
description = "datafusion-catalog-listing"
21+
authors.workspace = true
22+
edition.workspace = true
23+
homepage.workspace = true
24+
license.workspace = true
25+
readme.workspace = true
26+
repository.workspace = true
27+
rust-version.workspace = true
28+
version.workspace = true
29+
30+
[dependencies]
31+
arrow = { workspace = true }
32+
arrow-schema = { workspace = true }
33+
async-compression = { version = "0.4.0", features = [
34+
"bzip2",
35+
"gzip",
36+
"xz",
37+
"zstd",
38+
"tokio",
39+
], optional = true }
40+
chrono = { workspace = true }
41+
datafusion-catalog = { workspace = true }
42+
datafusion-common = { workspace = true, features = ["object_store"] }
43+
datafusion-execution = { workspace = true }
44+
datafusion-expr = { workspace = true }
45+
datafusion-physical-expr = { workspace = true }
46+
datafusion-physical-expr-common = { workspace = true }
47+
datafusion-physical-plan = { workspace = true }
48+
futures = { workspace = true }
49+
glob = "0.3.0"
50+
itertools = { workspace = true }
51+
log = { workspace = true }
52+
object_store = { workspace = true }
53+
url = { workspace = true }
54+
55+
[dev-dependencies]
56+
async-trait = { workspace = true }
57+
tempfile = { workspace = true }
58+
tokio = { workspace = true }
59+
60+
[lints]
61+
workspace = true
62+
63+
[lib]
64+
name = "datafusion_catalog_listing"
65+
path = "src/mod.rs"
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../LICENSE.txt

datafusion/catalog-listing/NOTICE.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../NOTICE.txt

datafusion/catalog-listing/README.md

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion catalog-listing
21+
22+
[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.
23+
24+
This crate is a submodule of DataFusion with [ListingTable], an implementation
25+
of [TableProvider] based on files in a directory (either locally or on remote
26+
object storage such as S3).
27+
28+
[df]: https://crates.io/crates/datafusion
29+
[listingtable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
30+
[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html

datafusion/core/src/datasource/listing/helpers.rs renamed to datafusion/catalog-listing/src/helpers.rs

+103-27
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::sync::Arc;
2222

2323
use super::ListingTableUrl;
2424
use super::PartitionedFile;
25-
use crate::execution::context::SessionState;
25+
use datafusion_catalog::Session;
2626
use datafusion_common::internal_err;
2727
use datafusion_common::{HashMap, Result, ScalarValue};
2828
use datafusion_expr::{BinaryExpr, Operator};
@@ -154,7 +154,7 @@ pub fn split_files(
154154
chunks
155155
}
156156

157-
struct Partition {
157+
pub struct Partition {
158158
/// The path to the partition, including the table prefix
159159
path: Path,
160160
/// How many path segments below the table prefix `path` contains
@@ -183,7 +183,7 @@ impl Partition {
183183
}
184184

185185
/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
186-
async fn list_partitions(
186+
pub async fn list_partitions(
187187
store: &dyn ObjectStore,
188188
table_path: &ListingTableUrl,
189189
max_depth: usize,
@@ -364,7 +364,7 @@ fn populate_partition_values<'a>(
364364
}
365365
}
366366

367-
fn evaluate_partition_prefix<'a>(
367+
pub fn evaluate_partition_prefix<'a>(
368368
partition_cols: &'a [(String, DataType)],
369369
filters: &'a [Expr],
370370
) -> Option<Path> {
@@ -405,7 +405,7 @@ fn evaluate_partition_prefix<'a>(
405405
/// `filters` should only contain expressions that can be evaluated
406406
/// using only the partition columns.
407407
pub async fn pruned_partition_list<'a>(
408-
ctx: &'a SessionState,
408+
ctx: &'a dyn Session,
409409
store: &'a dyn ObjectStore,
410410
table_path: &'a ListingTableUrl,
411411
filters: &'a [Expr],
@@ -489,7 +489,7 @@ pub async fn pruned_partition_list<'a>(
489489

490490
/// Extract the partition values for the given `file_path` (in the given `table_path`)
491491
/// associated to the partitions defined by `table_partition_cols`
492-
fn parse_partitions_for_path<'a, I>(
492+
pub fn parse_partitions_for_path<'a, I>(
493493
table_path: &ListingTableUrl,
494494
file_path: &'a Path,
495495
table_partition_cols: I,
@@ -517,17 +517,36 @@ where
517517
}
518518
Some(part_values)
519519
}
520+
/// Describe a partition as a (path, depth, files) tuple for easier assertions
521+
pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
522+
(
523+
partition.path.as_ref(),
524+
partition.depth,
525+
partition
526+
.files
527+
.as_ref()
528+
.map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
529+
.unwrap_or_default(),
530+
)
531+
}
520532

521533
#[cfg(test)]
522534
mod tests {
535+
use async_trait::async_trait;
536+
use datafusion_execution::config::SessionConfig;
537+
use datafusion_execution::runtime_env::RuntimeEnv;
538+
use futures::FutureExt;
539+
use object_store::memory::InMemory;
540+
use std::any::Any;
523541
use std::ops::Not;
524-
525-
use futures::StreamExt;
526-
527-
use crate::test::object_store::make_test_store_and_state;
528-
use datafusion_expr::{case, col, lit, Expr};
542+
// use futures::StreamExt;
529543

530544
use super::*;
545+
use datafusion_expr::{
546+
case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF,
547+
};
548+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
549+
use datafusion_physical_plan::ExecutionPlan;
531550

532551
#[test]
533552
fn test_split_files() {
@@ -578,7 +597,7 @@ mod tests {
578597
]);
579598
let filter = Expr::eq(col("mypartition"), lit("val1"));
580599
let pruned = pruned_partition_list(
581-
&state,
600+
state.as_ref(),
582601
store.as_ref(),
583602
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
584603
&[filter],
@@ -603,7 +622,7 @@ mod tests {
603622
]);
604623
let filter = Expr::eq(col("mypartition"), lit("val1"));
605624
let pruned = pruned_partition_list(
606-
&state,
625+
state.as_ref(),
607626
store.as_ref(),
608627
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
609628
&[filter],
@@ -643,7 +662,7 @@ mod tests {
643662
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
644663
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
645664
let pruned = pruned_partition_list(
646-
&state,
665+
state.as_ref(),
647666
store.as_ref(),
648667
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
649668
&[filter1, filter2],
@@ -680,19 +699,6 @@ mod tests {
680699
);
681700
}
682701

683-
/// Describe a partition as a (path, depth, files) tuple for easier assertions
684-
fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
685-
(
686-
partition.path.as_ref(),
687-
partition.depth,
688-
partition
689-
.files
690-
.as_ref()
691-
.map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
692-
.unwrap_or_default(),
693-
)
694-
}
695-
696702
#[tokio::test]
697703
async fn test_list_partition() {
698704
let (store, _) = make_test_store_and_state(&[
@@ -994,4 +1000,74 @@ mod tests {
9941000
Some(Path::from("a=1970-01-05")),
9951001
);
9961002
}
1003+
1004+
pub fn make_test_store_and_state(
1005+
files: &[(&str, u64)],
1006+
) -> (Arc<InMemory>, Arc<dyn Session>) {
1007+
let memory = InMemory::new();
1008+
1009+
for (name, size) in files {
1010+
memory
1011+
.put(&Path::from(*name), vec![0; *size as usize].into())
1012+
.now_or_never()
1013+
.unwrap()
1014+
.unwrap();
1015+
}
1016+
1017+
(Arc::new(memory), Arc::new(MockSession {}))
1018+
}
1019+
1020+
struct MockSession {}
1021+
1022+
#[async_trait]
1023+
impl Session for MockSession {
1024+
fn session_id(&self) -> &str {
1025+
unimplemented!()
1026+
}
1027+
1028+
fn config(&self) -> &SessionConfig {
1029+
unimplemented!()
1030+
}
1031+
1032+
async fn create_physical_plan(
1033+
&self,
1034+
_logical_plan: &LogicalPlan,
1035+
) -> Result<Arc<dyn ExecutionPlan>> {
1036+
unimplemented!()
1037+
}
1038+
1039+
fn create_physical_expr(
1040+
&self,
1041+
_expr: Expr,
1042+
_df_schema: &DFSchema,
1043+
) -> Result<Arc<dyn PhysicalExpr>> {
1044+
unimplemented!()
1045+
}
1046+
1047+
fn scalar_functions(&self) -> &std::collections::HashMap<String, Arc<ScalarUDF>> {
1048+
unimplemented!()
1049+
}
1050+
1051+
fn aggregate_functions(
1052+
&self,
1053+
) -> &std::collections::HashMap<String, Arc<AggregateUDF>> {
1054+
unimplemented!()
1055+
}
1056+
1057+
fn window_functions(&self) -> &std::collections::HashMap<String, Arc<WindowUDF>> {
1058+
unimplemented!()
1059+
}
1060+
1061+
fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1062+
unimplemented!()
1063+
}
1064+
1065+
fn execution_props(&self) -> &ExecutionProps {
1066+
unimplemented!()
1067+
}
1068+
1069+
fn as_any(&self) -> &dyn Any {
1070+
unimplemented!()
1071+
}
1072+
}
9971073
}

0 commit comments

Comments
 (0)