Skip to content

Commit bcc8566

Browse files
renato2099timsaucer
authored andcommitted
Exposing FFI to python
Exposing FFI to python Workin progress on python catalog Flushing out schema and catalog providers Adding implementation of python based catalog and schema providers Small updates after rebase
1 parent 1391078 commit bcc8566

File tree

15 files changed

+899
-73
lines changed

15 files changed

+899
-73
lines changed

Cargo.lock

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ substrait = ["dep:datafusion-substrait"]
3737
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
3838
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
3939
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
40+
pyo3-log = "0.12.4"
4041
arrow = { version = "55.1.0", features = ["pyarrow"] }
4142
datafusion = { version = "48.0.0", features = ["avro", "unicode_expressions"] }
4243
datafusion-substrait = { version = "48.0.0", optional = true }
@@ -49,6 +50,7 @@ async-trait = "0.1.88"
4950
futures = "0.3"
5051
object_store = { version = "0.12.1", features = ["aws", "gcp", "azure", "http"] }
5152
url = "2"
53+
log = "0.4.27"
5254

5355
[build-dependencies]
5456
prost-types = "0.13.1" # keep in line with `datafusion-substrait`

examples/datafusion-ffi-example/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/datafusion-ffi-example/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"]
2727
arrow = { version = "55.0.0" }
2828
arrow-array = { version = "55.0.0" }
2929
arrow-schema = { version = "55.0.0" }
30+
async-trait = "0.1.88"
3031

3132
[build-dependencies]
3233
pyo3-build-config = "0.23"
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pyarrow as pa
21+
from datafusion import SessionContext
22+
from datafusion_ffi_example import MyCatalogProvider
23+
24+
25+
def test_catalog_provider():
26+
ctx = SessionContext()
27+
28+
my_catalog_name = "my_catalog"
29+
expected_schema_name = "my_schema"
30+
expected_table_name = "my_table"
31+
expected_table_columns = ["units", "price"]
32+
33+
catalog_provider = MyCatalogProvider()
34+
ctx.register_catalog_provider(my_catalog_name, catalog_provider)
35+
my_catalog = ctx.catalog(my_catalog_name)
36+
37+
my_catalog_schemas = my_catalog.names()
38+
assert expected_schema_name in my_catalog_schemas
39+
my_database = my_catalog.database(expected_schema_name)
40+
assert expected_table_name in my_database.names()
41+
my_table = my_database.table(expected_table_name)
42+
assert expected_table_columns == my_table.schema.names
43+
44+
result = ctx.table(
45+
f"{my_catalog_name}.{expected_schema_name}.{expected_table_name}"
46+
).collect()
47+
assert len(result) == 2
48+
49+
col0_result = [r.column(0) for r in result]
50+
col1_result = [r.column(1) for r in result]
51+
expected_col0 = [
52+
pa.array([10, 20, 30], type=pa.int32()),
53+
pa.array([5, 7], type=pa.int32()),
54+
]
55+
expected_col1 = [
56+
pa.array([1, 2, 5], type=pa.float64()),
57+
pa.array([1.5, 2.5], type=pa.float64()),
58+
]
59+
assert col0_result == expected_col0
60+
assert col1_result == expected_col1
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
19+
use std::{any::Any, fmt::Debug, sync::Arc};
20+
21+
use arrow::datatypes::Schema;
22+
use async_trait::async_trait;
23+
use datafusion::{
24+
catalog::{
25+
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, TableProvider,
26+
},
27+
common::exec_err,
28+
datasource::MemTable,
29+
error::{DataFusionError, Result},
30+
};
31+
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
32+
use pyo3::types::PyCapsule;
33+
34+
pub fn my_table() -> Arc<dyn TableProvider + 'static> {
35+
use arrow::datatypes::{DataType, Field};
36+
use datafusion::common::record_batch;
37+
38+
let schema = Arc::new(Schema::new(vec![
39+
Field::new("units", DataType::Int32, true),
40+
Field::new("price", DataType::Float64, true),
41+
]));
42+
43+
let partitions = vec![
44+
record_batch!(
45+
("units", Int32, vec![10, 20, 30]),
46+
("price", Float64, vec![1.0, 2.0, 5.0])
47+
)
48+
.unwrap(),
49+
record_batch!(
50+
("units", Int32, vec![5, 7]),
51+
("price", Float64, vec![1.5, 2.5])
52+
)
53+
.unwrap(),
54+
];
55+
56+
Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
57+
}
58+
59+
#[derive(Debug)]
60+
pub struct FixedSchemaProvider {
61+
inner: MemorySchemaProvider,
62+
}
63+
64+
impl Default for FixedSchemaProvider {
65+
fn default() -> Self {
66+
let inner = MemorySchemaProvider::new();
67+
68+
let table = my_table();
69+
70+
let _ = inner.register_table("my_table".to_string(), table).unwrap();
71+
72+
Self { inner }
73+
}
74+
}
75+
76+
#[async_trait]
77+
impl SchemaProvider for FixedSchemaProvider {
78+
fn as_any(&self) -> &dyn Any {
79+
self
80+
}
81+
82+
fn table_names(&self) -> Vec<String> {
83+
self.inner.table_names()
84+
}
85+
86+
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
87+
self.inner.table(name).await
88+
}
89+
90+
fn register_table(
91+
&self,
92+
name: String,
93+
table: Arc<dyn TableProvider>,
94+
) -> Result<Option<Arc<dyn TableProvider>>> {
95+
self.inner.register_table(name, table)
96+
}
97+
98+
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
99+
self.inner.deregister_table(name)
100+
}
101+
102+
fn table_exist(&self, name: &str) -> bool {
103+
self.inner.table_exist(name)
104+
}
105+
}
106+
107+
/// This catalog provider is intended only for unit tests. It prepopulates with one
108+
/// schema and only allows for schemas named after four types of fruit.
109+
#[pyclass(
110+
name = "MyCatalogProvider",
111+
module = "datafusion_ffi_example",
112+
subclass
113+
)]
114+
#[derive(Debug)]
115+
pub(crate) struct MyCatalogProvider {
116+
inner: MemoryCatalogProvider,
117+
}
118+
119+
impl Default for MyCatalogProvider {
120+
fn default() -> Self {
121+
let inner = MemoryCatalogProvider::new();
122+
123+
let schema_name: &str = "my_schema";
124+
let _ = inner.register_schema(schema_name, Arc::new(FixedSchemaProvider::default()));
125+
126+
Self { inner }
127+
}
128+
}
129+
130+
impl CatalogProvider for MyCatalogProvider {
131+
fn as_any(&self) -> &dyn Any {
132+
self
133+
}
134+
135+
fn schema_names(&self) -> Vec<String> {
136+
self.inner.schema_names()
137+
}
138+
139+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
140+
self.inner.schema(name)
141+
}
142+
143+
fn register_schema(
144+
&self,
145+
name: &str,
146+
schema: Arc<dyn SchemaProvider>,
147+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
148+
self.inner.register_schema(name, schema)
149+
}
150+
151+
fn deregister_schema(
152+
&self,
153+
name: &str,
154+
cascade: bool,
155+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
156+
self.inner.deregister_schema(name, cascade)
157+
}
158+
}
159+
160+
#[pymethods]
161+
impl MyCatalogProvider {
162+
#[new]
163+
pub fn new() -> Self {
164+
Self {
165+
inner: Default::default(),
166+
}
167+
}
168+
169+
pub fn __datafusion_catalog_provider__<'py>(
170+
&self,
171+
py: Python<'py>,
172+
) -> PyResult<Bound<'py, PyCapsule>> {
173+
let name = cr"datafusion_catalog_provider".into();
174+
let catalog_provider =
175+
FFI_CatalogProvider::new(Arc::new(MyCatalogProvider::default()), None);
176+
177+
PyCapsule::new(py, catalog_provider, Some(name))
178+
}
179+
}

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::catalog_provider::MyCatalogProvider;
1819
use crate::table_function::MyTableFunction;
1920
use crate::table_provider::MyTableProvider;
2021
use pyo3::prelude::*;
2122

23+
pub(crate) mod catalog_provider;
2224
pub(crate) mod table_function;
2325
pub(crate) mod table_provider;
2426

2527
#[pymodule]
2628
fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
2729
m.add_class::<MyTableProvider>()?;
2830
m.add_class::<MyTableFunction>()?;
31+
m.add_class::<MyCatalogProvider>()?;
2932
Ok(())
3033
}

python/datafusion/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
from datafusion.col import col, column
3434

35-
from . import functions, object_store, substrait, unparser
35+
from . import catalog, functions, object_store, substrait, unparser
3636

3737
# The following imports are okay to remain as opaque to the user.
3838
from ._internal import Config
@@ -91,6 +91,7 @@
9191
"TableFunction",
9292
"WindowFrame",
9393
"WindowUDF",
94+
"catalog",
9495
"col",
9596
"column",
9697
"common",

0 commit comments

Comments
 (0)