Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Feb 13, 2025
1 parent 03df8e4 commit 6a5830b
Show file tree
Hide file tree
Showing 26 changed files with 803 additions and 393 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ concurrency:
cancel-in-progress: true

env:
RUSTFLAGS: -C debuginfo=0 # Do not produce debug symbols to keep memory usage down
RUSTFLAGS: -C debuginfo=0 # Do not produce debug symbols to keep memory usage down
RUST_BACKTRACE: 1
PYTHONUTF8: 1

Expand All @@ -39,7 +39,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: ['3.9', '3.12', '3.13']
python-version: ['3.9', '3.12', '3.13', '3.12.8', '3.13.1']
include:
- os: windows-latest
python-version: '3.13'
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/serde/df.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<'de> Deserialize<'de> for DataFrame {
where
D: Deserializer<'de>,
{
deserialize_map_bytes(deserializer, &mut |b| {
deserialize_map_bytes(deserializer, |b| {
let v = &mut b.as_ref();
Self::deserialize_from_reader(v)
})?
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/serde/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<'de> Deserialize<'de> for Series {
where
D: Deserializer<'de>,
{
deserialize_map_bytes(deserializer, &mut |b| {
deserialize_map_bytes(deserializer, |b| {
let v = &mut b.as_ref();
Self::deserialize_from_reader(v)
})?
Expand Down
147 changes: 99 additions & 48 deletions crates/polars-io/src/cloud/credential_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ pub use object_store::gcp::GcpCredential;
use polars_core::config;
use polars_error::{polars_bail, PolarsResult};
#[cfg(feature = "python")]
use polars_utils::python_function::PythonFunction;
#[cfg(feature = "python")]
use python_impl::PythonCredentialProvider;

#[derive(Clone, Debug, PartialEq, Hash, Eq)]
Expand All @@ -43,23 +41,36 @@ impl PlCredentialProvider {
Self::Function(CredentialProviderFunction(Arc::new(func)))
}

/// Intended to be called with an internal `CredentialProviderBuilder`.
#[cfg(feature = "python")]
pub fn from_python_func(func: PythonFunction) -> Self {
Self::Python(python_impl::PythonCredentialProvider(Arc::new(func)))
}
pub fn from_python_builder(func: pyo3::PyObject) -> Self {
use polars_utils::python_function::PythonObject;

#[cfg(feature = "python")]
pub fn from_python_func_object(func: pyo3::PyObject) -> Self {
Self::Python(python_impl::PythonCredentialProvider(Arc::new(
PythonFunction(func),
)))
Self::Python(python_impl::PythonCredentialProvider::from_builder(
Arc::new(PythonObject(func)),
))
}

pub(super) fn func_addr(&self) -> usize {
match self {
Self::Function(CredentialProviderFunction(v)) => Arc::as_ptr(v) as *const () as usize,
#[cfg(feature = "python")]
Self::Python(PythonCredentialProvider(v)) => Arc::as_ptr(v) as *const () as usize,
Self::Python(PythonCredentialProvider {
// We know this is only used for hashing, it is safe to ignore `is_builder`, since we
// don't expect that the same py_object can be both a builder and provider.
py_object,
is_builder: _,
}) => Arc::as_ptr(py_object) as *const () as usize,
}
}

/// Python passes a `CredentialProviderBuilder`, this calls the builder to
/// build the final credential provider.
pub(crate) fn try_into_initialized(self) -> PolarsResult<Option<Self>> {
match self {
Self::Function(_) => Ok(Some(self)),
#[cfg(feature = "python")]
Self::Python(v) => Ok(v.try_into_initialized()?.map(Self::Python)),
}
}
}
Expand Down Expand Up @@ -452,8 +463,8 @@ mod python_impl {
use std::hash::Hash;
use std::sync::Arc;

use polars_error::PolarsError;
use polars_utils::python_function::PythonFunction;
use polars_error::{to_compute_err, PolarsError, PolarsResult};
use polars_utils::python_function::PythonObject;
use pyo3::exceptions::PyValueError;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyAnyMethods, PyDict, PyDictMethods};
Expand All @@ -462,11 +473,63 @@ mod python_impl {
use super::IntoCredentialProvider;

#[derive(Clone, Debug)]
pub struct PythonCredentialProvider(pub(super) Arc<PythonFunction>);
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PythonCredentialProvider {
pub(super) py_object: Arc<PythonObject>,
/// Indicates `py_object` is a `CredentialProviderBuilder`.
pub(super) is_builder: bool,
}

impl PythonCredentialProvider {
pub(crate) fn from_builder(py_object: Arc<PythonObject>) -> Self {
if cfg!(debug_assertions) {
Python::with_gil(|py| {
let cls_name = py_object
.getattr(py, "__class__")
.unwrap()
.getattr(py, "__name__")
.unwrap()
.extract::<pyo3::pybacked::PyBackedStr>(py)
.unwrap();

assert_eq!(&cls_name, "CredentialProviderBuilder");
});
}

Self {
py_object,
is_builder: true,
}
}

pub(crate) fn from_provider(py_object: Arc<PythonObject>) -> Self {
Self {
py_object,
is_builder: false,
}
}

/// Performs initialization if necessary
pub(crate) fn try_into_initialized(self) -> PolarsResult<Option<Self>> {
if self.is_builder {
let opt_initialized_py_object = Python::with_gil(|py| {
let build_fn = self.py_object.getattr(py, "build_credential_provider")?;

let v = build_fn.call0(py)?;
let v = (!v.is_none(py)).then_some(v);

impl From<PythonFunction> for PythonCredentialProvider {
fn from(value: PythonFunction) -> Self {
Self(Arc::new(value))
pyo3::PyResult::Ok(v)
})
.map_err(to_compute_err)?;

Ok(opt_initialized_py_object
.map(PythonObject)
.map(Arc::new)
.map(Self::from_provider))
} else {
// Note: We don't expect to hit here.
Ok(Some(self))
}
}
}

Expand All @@ -479,8 +542,12 @@ mod python_impl {
CredentialProviderFunction, ObjectStoreCredential,
};

assert!(!self.is_builder); // should not be a builder at this point.

let func = self.py_object;

CredentialProviderFunction(Arc::new(move || {
let func = self.0.clone();
let func = func.clone();
Box::pin(async move {
let mut credentials = object_store::aws::AwsCredential {
key_id: String::new(),
Expand Down Expand Up @@ -554,8 +621,12 @@ mod python_impl {
CredentialProviderFunction, ObjectStoreCredential,
};

assert!(!self.is_builder); // should not be a builder at this point.

let func = self.py_object;

CredentialProviderFunction(Arc::new(move || {
let func = self.0.clone();
let func = func.clone();
Box::pin(async move {
let mut credentials = None;

Expand Down Expand Up @@ -621,8 +692,11 @@ mod python_impl {
CredentialProviderFunction, ObjectStoreCredential,
};

assert!(!self.is_builder); // should not be a builder at this point.

let func = self.py_object;
CredentialProviderFunction(Arc::new(move || {
let func = self.0.clone();
let func = func.clone();
Box::pin(async move {
let mut credentials = object_store::gcp::GcpCredential {
bearer: String::new(),
Expand Down Expand Up @@ -666,11 +740,14 @@ mod python_impl {
}
}

// Note: We don't consider `is_builder` for hash/eq - we don't expect the same Arc<PythonObject>
// to be referenced as both true and false from the `is_builder` field.

impl Eq for PythonCredentialProvider {}

impl PartialEq for PythonCredentialProvider {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
Arc::ptr_eq(&self.py_object, &other.py_object)
}
}

Expand All @@ -680,33 +757,7 @@ mod python_impl {
// * Inner is an `Arc`
// * Visibility is limited to super
// * No code in `mod python_impl` or `super` mutates the Arc inner.
state.write_usize(Arc::as_ptr(&self.0) as *const () as usize)
}
}

#[cfg(feature = "serde")]
mod _serde_impl {
use polars_utils::python_function::PySerializeWrap;

use super::PythonCredentialProvider;

impl serde::Serialize for PythonCredentialProvider {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
PySerializeWrap(self.0.as_ref()).serialize(serializer)
}
}

impl<'a> serde::Deserialize<'a> for PythonCredentialProvider {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
PySerializeWrap::<super::PythonFunction>::deserialize(deserializer)
.map(|x| x.0.into())
}
state.write_usize(Arc::as_ptr(&self.py_object) as *const () as usize)
}
}
}
Expand Down
35 changes: 19 additions & 16 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use regex::Regex;
#[cfg(feature = "http")]
use reqwest::header::HeaderMap;
#[cfg(feature = "serde")]
use serde::Deserializer;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "cloud")]
use url::Url;
Expand Down Expand Up @@ -80,19 +78,11 @@ pub struct CloudOptions {
pub file_cache_ttl: u64,
pub(crate) config: Option<CloudConfig>,
#[cfg(feature = "cloud")]
#[cfg_attr(feature = "serde", serde(deserialize_with = "deserialize_or_default"))]
/// Note: In most cases you will want to access this via [`CloudOptions::initialized_credential_provider`]
/// rather than directly.
pub(crate) credential_provider: Option<PlCredentialProvider>,
}

#[cfg(all(feature = "serde", feature = "cloud"))]
fn deserialize_or_default<'de, D>(deserializer: D) -> Result<Option<PlCredentialProvider>, D::Error>
where
D: Deserializer<'de>,
{
type T = Option<PlCredentialProvider>;
T::deserialize(deserializer).or_else(|_| Ok(Default::default()))
}

impl Default for CloudOptions {
fn default() -> Self {
Self::default_static_ref().clone()
Expand Down Expand Up @@ -392,7 +382,7 @@ impl CloudOptions {

let builder = builder.with_retry(get_retry_config(self.max_retries));

let builder = if let Some(v) = self.credential_provider.clone() {
let builder = if let Some(v) = self.initialized_credential_provider()? {
builder.with_credentials(v.into_aws_provider())
} else {
builder
Expand Down Expand Up @@ -438,7 +428,7 @@ impl CloudOptions {
.with_url(url)
.with_retry(get_retry_config(self.max_retries));

let builder = if let Some(v) = self.credential_provider.clone() {
let builder = if let Some(v) = self.initialized_credential_provider()? {
if verbose {
eprintln!(
"[CloudOptions::build_azure]: Using credential provider {:?}",
Expand Down Expand Up @@ -470,7 +460,9 @@ impl CloudOptions {
pub fn build_gcp(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
use super::credential_provider::IntoCredentialProvider;

let builder = if self.credential_provider.is_none() {
let credential_provider = self.initialized_credential_provider()?;

let builder = if credential_provider.is_none() {
GoogleCloudStorageBuilder::from_env()
} else {
GoogleCloudStorageBuilder::new()
Expand All @@ -491,7 +483,7 @@ impl CloudOptions {
.with_url(url)
.with_retry(get_retry_config(self.max_retries));

let builder = if let Some(v) = self.credential_provider.clone() {
let builder = if let Some(v) = credential_provider.clone() {
builder.with_credentials(v.into_gcp_provider())
} else {
builder
Expand Down Expand Up @@ -629,6 +621,17 @@ impl CloudOptions {
},
}
}

/// Python passes a credential provider builder that needs to be called to get the actual credential
/// provider.
#[cfg(feature = "cloud")]
fn initialized_credential_provider(&self) -> PolarsResult<Option<PlCredentialProvider>> {
if let Some(v) = self.credential_provider.clone() {
v.try_into_initialized()
} else {
Ok(None)
}
}
}

#[cfg(feature = "cloud")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
{
use polars_utils::pl_serialize::deserialize_map_bytes;

deserialize_map_bytes(deserializer, &mut |b| {
deserialize_map_bytes(deserializer, |b| {
let mut b = b.as_ref();
let mut protocol = TCompactInputProtocol::new(&mut b, usize::MAX);
ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom)
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/dsl/expr_dyn_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<'a> Deserialize<'a> for SpecialEq<Arc<dyn ColumnsUdf>> {
use serde::de::Error;
#[cfg(feature = "python")]
{
deserialize_map_bytes(deserializer, &mut |buf| {
deserialize_map_bytes(deserializer, |buf| {
if buf.starts_with(crate::dsl::python_dsl::PYTHON_SERDE_MAGIC_BYTE_MARK) {
let udf = crate::dsl::python_dsl::PythonUdfExpression::try_deserialize(&buf)
.map_err(|e| D::Error::custom(format!("{e}")))?;
Expand Down Expand Up @@ -407,7 +407,7 @@ impl<'a> Deserialize<'a> for GetOutput {
use serde::de::Error;
#[cfg(feature = "python")]
{
deserialize_map_bytes(deserializer, &mut |buf| {
deserialize_map_bytes(deserializer, |buf| {
if buf.starts_with(self::python_dsl::PYTHON_SERDE_MAGIC_BYTE_MARK) {
let get_output = self::python_dsl::PythonGetOutput::try_deserialize(&buf)
.map_err(|e| D::Error::custom(format!("{e}")))?;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/catalog/unity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl PyCatalogClient {
parse_cloud_options(storage_location, cloud_options.unwrap_or_default())?
.with_max_retries(retries)
.with_credential_provider(
credential_provider.map(PlCredentialProvider::from_python_func_object),
credential_provider.map(PlCredentialProvider::from_python_builder),
);

Ok(
Expand Down
Loading

0 comments on commit 6a5830b

Please sign in to comment.