Make Datafusion Ballista support Datafusion Python and shipping pyarrow UDFs to remote task contexts.
Important
This is just a showcase project and it is not meant to be maintained.
This project tests validity of datafusion-python/1003.
Note
This project has been part of Datafusion Ballista show case series
pyenv local 3.12
python3 -m venv .venv
source .venv/bin/activate
pip3 install -r requirements.txt
Patched branch of datafusion-python is needed.
A simple script will execute on ballista cluster:
from datafusion import SessionContext, udf, functions as f
import pyarrow.compute as pc
import pyarrow
# SessionContext with url specified will connect to ballista cluster
ctx = SessionContext(url = "df://localhost:50050")
conversation_rate_multiplier = 0.62137119
# arrow udf definition
def to_miles(km_data):
return pc.multiply(km_data, conversation_rate_multiplier)
# datafusion udf definition
to_miles_udf = udf(to_miles, [pyarrow.float64()], pyarrow.float64(), "stable")
# its incorrect to convert passenger_count to miles
df = df.select(to_miles_udf(f.col("passenger_count")), f.col("passenger_count"))
# show data
df.show()
Note: if notebook complains about cloudpickle
please !pip install
it, did not have time to find out how to specify it as a dependency.
rust client can wrap and execute python scrip:
let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;
let code = r#"
import pyarrow.compute as pc
conversation_rate_multiplier = 0.62137119
def to_miles(km_data):
return pc.multiply(km_data, conversation_rate_multiplier)
"#;
let udf = PythonUDF::from_code("to_miles", code).expect("udf created");
let udf = ScalarUDF::from(udf);
ctx.read_parquet("./data/alltypes.parquet", ParquetReadOptions::default())
.await?
.select(vec![udf.call(vec![lit(1.0) * col("id")])])?
.show()
.await?;
should produce:
+------------+------------------------------+
| double_col | to_miles(?table?.double_col) |
+------------+------------------------------+
| 0.0 | 0.0 |
| 10.1 | 6.275849019 |
| 0.0 | 0.0 |
| 10.1 | 6.275849019 |
| 0.0 | 0.0 |
| 10.1 | 6.275849019 |
| 0.0 | 0.0 |
| 10.1 | 6.275849019 |
+------------+------------------------------+
let config = SessionConfig::new_with_ballista()
.with_ballista_logical_extension_codec(Arc::new(PyLogicalCodec::default()))
.with_target_partitions(4);
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let ctx = SessionContext::remote_with_state("df://localhost:50050", state)
.await?
.with_function_factory(Arc::new(PythonFunctionFactory::default()));
let sql = r#"
CREATE FUNCTION to_miles(DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS '
import pyarrow.compute as pc
conversation_rate_multiplier = 0.62137119
def to_miles(km_data):
return pc.multiply(km_data, conversation_rate_multiplier)
'
"#;
ctx.sql(sql).await?.show().await?;
ctx.register_parquet("t", "./data/alltypes.parquet", ParquetReadOptions::default())
.await?;
ctx.sql("select double_col, to_miles(double_col) from t")
.await?
.show()
.await?;
Project creates a custom logical (PyLogicalCodec
) and physical (PyPhysicalCodec
) codecs which handle serialization and deserialization of python functions using cloudpickle library.
Custom codecs are registered on SessionContext
creation:
let config = SessionConfig::new_with_ballista()
.with_ballista_logical_extension_codec(Arc::new(PyLogicalCodec::default()))
.with_target_partitions(4);
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;
Custom FunctionFactory
provider PythonFunctionFactory
has been implemented to provide support for CREATE FUNCTION
statements.