From 97410e2687420c643839dc7948f11161448f34a0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 19 Jun 2024 13:10:14 +0000 Subject: [PATCH] refactor(python): Extract utility functions into _utils.pyx (#529) This PR moves a number of utility functions that were relatively self-contained into a new module, _utils.pyx. TODO: - dereplicate the error handling utility - Ensure "exported" functions are documented with a docstring --- python/.gitignore | 3 +- python/setup.py | 12 + python/src/nanoarrow/__init__.py | 2 +- python/src/nanoarrow/_lib.pyx | 358 +----------------- python/src/nanoarrow/_utils.pxd | 60 +++ python/src/nanoarrow/_utils.pyx | 493 +++++++++++++++++++++++++ python/src/nanoarrow/c_array.py | 7 +- python/src/nanoarrow/c_array_stream.py | 5 +- python/src/nanoarrow/c_buffer.py | 5 +- python/src/nanoarrow/c_schema.py | 5 +- python/src/nanoarrow/ipc.py | 5 +- python/tests/test_c_array.py | 5 +- python/tests/test_dlpack.py | 4 +- python/tests/test_ipc.py | 2 +- 14 files changed, 602 insertions(+), 364 deletions(-) create mode 100644 python/src/nanoarrow/_utils.pxd create mode 100644 python/src/nanoarrow/_utils.pyx diff --git a/python/.gitignore b/python/.gitignore index 54bd8aed0..49cfce5e3 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -17,8 +17,7 @@ # under the License. vendor/ -src/nanoarrow/_lib.c -src/nanoarrow/_ipc_lib.c +src/nanoarrow/_*.c # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/python/setup.py b/python/setup.py index bd205baf4..557959f11 100644 --- a/python/setup.py +++ b/python/setup.py @@ -67,6 +67,18 @@ def get_version(pkg_path): setup( ext_modules=[ + Extension( + name="nanoarrow._utils", + include_dirs=["src/nanoarrow", "vendor"], + language="c", + sources=[ + "src/nanoarrow/_utils.pyx", + "vendor/nanoarrow.c", + ], + extra_compile_args=extra_compile_args, + extra_link_args=extra_link_args, + define_macros=extra_define_macros, + ), Extension( name="nanoarrow._lib", include_dirs=["src/nanoarrow", "vendor"], diff --git a/python/src/nanoarrow/__init__.py b/python/src/nanoarrow/__init__.py index b021bbe55..9f9b48e3b 100644 --- a/python/src/nanoarrow/__init__.py +++ b/python/src/nanoarrow/__init__.py @@ -24,7 +24,7 @@ Arrow C Data and Arrow C Stream interfaces. """ -from nanoarrow._lib import c_version +from nanoarrow._utils import c_version from nanoarrow.c_array import c_array_from_buffers, c_array from nanoarrow.c_array_stream import c_array_stream from nanoarrow.c_schema import c_schema diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx index 058c27c91..a45a3dcd1 100644 --- a/python/src/nanoarrow/_lib.pyx +++ b/python/src/nanoarrow/_lib.pyx @@ -39,7 +39,6 @@ from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, PyCapsule_Is from cpython.unicode cimport PyUnicode_AsUTF8AndSize from cpython cimport ( Py_buffer, - PyObject_CheckBuffer, PyObject_GetBuffer, PyBuffer_Release, PyBuffer_ToContiguous, @@ -58,115 +57,18 @@ from enum import Enum from sys import byteorder as sys_byteorder from struct import unpack_from, iter_unpack, calcsize, Struct from nanoarrow import _repr_utils - -def c_version(): - """Return the nanoarrow C library version string - """ - return ArrowNanoarrowVersion().decode("UTF-8") - - -# CPython utilities that are helpful in Python and not available in all -# implementations of ctypes (e.g., early Python versions, pypy) -def _obj_is_capsule(obj, str name): - return PyCapsule_IsValid(obj, name.encode()) == 1 - - -def _obj_is_buffer(obj): - return PyObject_CheckBuffer(obj) == 1 - - -# PyCapsule utilities -# -# PyCapsules are used (1) to safely manage memory associated with C structures -# by initializing them and ensuring the appropriate cleanup is invoked when -# the object is deleted; and (2) as an export mechanism conforming to the -# Arrow PyCapsule interface for the objects where this is defined. -cdef void pycapsule_schema_deleter(object schema_capsule) noexcept: - cdef ArrowSchema* schema = PyCapsule_GetPointer( - schema_capsule, 'arrow_schema' - ) - if schema.release != NULL: - ArrowSchemaRelease(schema) - - ArrowFree(schema) - - -cdef object alloc_c_schema(ArrowSchema** c_schema): - c_schema[0] = ArrowMalloc(sizeof(ArrowSchema)) - # Ensure the capsule destructor doesn't call a random release pointer - c_schema[0].release = NULL - return PyCapsule_New(c_schema[0], 'arrow_schema', &pycapsule_schema_deleter) - - -cdef void pycapsule_array_deleter(object array_capsule) noexcept: - cdef ArrowArray* array = PyCapsule_GetPointer( - array_capsule, 'arrow_array' - ) - # Do not invoke the deleter on a used/moved capsule - if array.release != NULL: - ArrowArrayRelease(array) - - ArrowFree(array) - - -cdef object alloc_c_array(ArrowArray** c_array): - c_array[0] = ArrowMalloc(sizeof(ArrowArray)) - # Ensure the capsule destructor doesn't call a random release pointer - c_array[0].release = NULL - return PyCapsule_New(c_array[0], 'arrow_array', &pycapsule_array_deleter) - - -cdef void pycapsule_array_stream_deleter(object stream_capsule) noexcept: - cdef ArrowArrayStream* stream = PyCapsule_GetPointer( - stream_capsule, 'arrow_array_stream' - ) - # Do not invoke the deleter on a used/moved capsule - if stream.release != NULL: - ArrowArrayStreamRelease(stream) - - ArrowFree(stream) - - -cdef object alloc_c_array_stream(ArrowArrayStream** c_stream): - c_stream[0] = ArrowMalloc(sizeof(ArrowArrayStream)) - # Ensure the capsule destructor doesn't call a random release pointer - c_stream[0].release = NULL - return PyCapsule_New(c_stream[0], 'arrow_array_stream', &pycapsule_array_stream_deleter) - - -cdef void pycapsule_device_array_deleter(object device_array_capsule) noexcept: - cdef ArrowDeviceArray* device_array = PyCapsule_GetPointer( - device_array_capsule, 'arrow_device_array' - ) - # Do not invoke the deleter on a used/moved capsule - if device_array.array.release != NULL: - device_array.array.release(&device_array.array) - - ArrowFree(device_array) - - -cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array): - c_device_array[0] = ArrowMalloc(sizeof(ArrowDeviceArray)) - # Ensure the capsule destructor doesn't call a random release pointer - c_device_array[0].array.release = NULL - return PyCapsule_New(c_device_array[0], 'arrow_device_array', &pycapsule_device_array_deleter) - - -cdef void pycapsule_array_view_deleter(object array_capsule) noexcept: - cdef ArrowArrayView* array_view = PyCapsule_GetPointer( - array_capsule, 'nanoarrow_array_view' - ) - - ArrowArrayViewReset(array_view) - - ArrowFree(array_view) - - -cdef object alloc_c_array_view(ArrowArrayView** c_array_view): - c_array_view[0] = ArrowMalloc(sizeof(ArrowArrayView)) - ArrowArrayViewInitFromType(c_array_view[0], NANOARROW_TYPE_UNINITIALIZED) - return PyCapsule_New(c_array_view[0], 'nanoarrow_array_view', &pycapsule_array_view_deleter) - +from nanoarrow._utils cimport ( + alloc_c_schema, + alloc_c_array, + alloc_c_array_stream, + alloc_c_device_array, + alloc_c_array_view, + alloc_c_buffer, + c_array_shallow_copy, + c_device_array_shallow_copy, + c_buffer_set_pybuffer, + Error +) cdef void pycapsule_dlpack_deleter(object dltensor) noexcept: cdef DLManagedTensor* dlm_tensor @@ -259,153 +161,6 @@ cdef DLDevice view_to_dlpack_device(CBufferView view): return device - -# Provide a way to validate that we release all references we create -cdef int64_t pyobject_buffer_count = 0 - -def get_pyobject_buffer_count(): - global pyobject_buffer_count - return pyobject_buffer_count - - -cdef void c_deallocate_pyobject_buffer(ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t size) noexcept with gil: - Py_DECREF(allocator.private_data) - - global pyobject_buffer_count - pyobject_buffer_count -= 1 - - -cdef void c_pyobject_buffer(object base, const void* buf, int64_t size_bytes, ArrowBuffer* out): - out.data = buf - out.size_bytes = size_bytes - out.allocator = ArrowBufferDeallocator( - c_deallocate_pyobject_buffer, - base - ) - Py_INCREF(base) - - global pyobject_buffer_count - pyobject_buffer_count += 1 - - -cdef void c_array_shallow_copy(object base, const ArrowArray* src, ArrowArray* dst): - """Make the shallowest (safe) copy possible - - Once a CArray exists at the Python level, nanoarrow makes it very difficult - to perform an operation that might render the pointed-to ArrowArray invalid. - Performing a deep copy (i.e., copying buffer content) would be unexpected and - prohibitively expensive, and performing a truly shallow copy (i.e., adding - an ArrowArray implementation that simply PyINCREF/pyDECREFs the original array) - is not safe because the Arrow C Data interface specification allows children - to be "move"d. Even though nanoarrow's Python bindings do not do this unless - explicitly requested, when passed to some other library they are free to do so. - - This implementation of a shallow copy creates a recursive copy of the original - array, including any children and dictionary (if present). It uses the - C library's ArrowArray implementation, which takes care of releasing children, - and allows us to use the ArrowBufferDeallocator mechanism to add/remove - references to the appropriate PyObject. - """ - # Allocate an ArrowArray* that will definitely be cleaned up should an exception - # be raised in the process of shallow copying its contents - cdef ArrowArray* tmp - shelter = alloc_c_array(&tmp) - cdef int code - - code = ArrowArrayInitFromType(tmp, NANOARROW_TYPE_UNINITIALIZED) - Error.raise_error_not_ok("ArrowArrayInitFromType()", code) - - # Copy data for this array, adding a reference for each buffer - # This allows us to use the nanoarrow C library's ArrowArray - # implementation without writing our own release callbacks/private_data. - tmp.length = src.length - tmp.offset = src.offset - tmp.null_count = src.null_count - - for i in range(src.n_buffers): - if src.buffers[i] != NULL: - # The purpose of this buffer is soley so that we can use the - # ArrowBufferDeallocator mechanism to add a reference to base. - # The ArrowArray release callback that exists here after - # because of ArrowArrayInitFromType() will call ArrowBufferReset() - # on any buffer that was injected in this way (and thus release the - # reference to base). We don't actually know the size of the buffer - # (and our release callback doesn't use it), so it is set to 0. - c_pyobject_buffer(base, src.buffers[i], 0, ArrowArrayBuffer(tmp, i)) - - # The actual pointer value is tracked separately from the ArrowBuffer - # (which is only concerned with object lifecycle). - tmp.buffers[i] = src.buffers[i] - - tmp.n_buffers = src.n_buffers - - # Recursive shallow copy children - if src.n_children > 0: - code = ArrowArrayAllocateChildren(tmp, src.n_children) - Error.raise_error_not_ok("ArrowArrayAllocateChildren()", code) - - for i in range(src.n_children): - c_array_shallow_copy(base, src.children[i], tmp.children[i]) - - # Recursive shallow copy dictionary - if src.dictionary != NULL: - code = ArrowArrayAllocateDictionary(tmp) - Error.raise_error_not_ok("ArrowArrayAllocateDictionary()", code) - - c_array_shallow_copy(base, src.dictionary, tmp.dictionary) - - # Move tmp into dst - ArrowArrayMove(tmp, dst) - - -cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray* src, - ArrowDeviceArray* dst) noexcept: - # Copy top-level information but leave the array marked as released - # TODO: Should the sync event be copied here too? - memcpy(dst, src, sizeof(ArrowDeviceArray)) - dst.array.release = NULL - - # Shallow copy the array - c_array_shallow_copy(base, &src.array, &dst.array) - - -cdef void pycapsule_buffer_deleter(object stream_capsule) noexcept: - cdef ArrowBuffer* buffer = PyCapsule_GetPointer( - stream_capsule, 'nanoarrow_buffer' - ) - - ArrowBufferReset(buffer) - ArrowFree(buffer) - - -cdef object alloc_c_buffer(ArrowBuffer** c_buffer): - c_buffer[0] = ArrowMalloc(sizeof(ArrowBuffer)) - ArrowBufferInit(c_buffer[0]) - return PyCapsule_New(c_buffer[0], 'nanoarrow_buffer', &pycapsule_buffer_deleter) - - -cdef void c_deallocate_pybuffer(ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t size) noexcept with gil: - cdef Py_buffer* buffer = allocator.private_data - PyBuffer_Release(buffer) - ArrowFree(buffer) - - -cdef ArrowBufferAllocator c_pybuffer_deallocator(Py_buffer* buffer): - # This should probably be changed in nanoarrow C; however, currently, the deallocator - # won't get called if buffer.buf is NULL. - if buffer.buf == NULL: - PyBuffer_Release(buffer) - return ArrowBufferAllocatorDefault() - - cdef Py_buffer* allocator_private = ArrowMalloc(sizeof(Py_buffer)) - if allocator_private == NULL: - PyBuffer_Release(buffer) - raise MemoryError() - - memcpy(allocator_private, buffer, sizeof(Py_buffer)) - return ArrowBufferDeallocator(&c_deallocate_pybuffer, allocator_private) - - cdef c_arrow_type_from_format(format): # PyBuffer_SizeFromFormat() was added in Python 3.9 (potentially faster) item_size = calcsize(format) @@ -523,91 +278,6 @@ cdef int c_format_from_arrow_type(ArrowType type_id, int element_size_bits, size return element_size_bits_calc -cdef object c_buffer_set_pybuffer(object obj, ArrowBuffer** c_buffer): - ArrowBufferReset(c_buffer[0]) - - cdef Py_buffer buffer - cdef int rc = PyObject_GetBuffer(obj, &buffer, PyBUF_FORMAT | PyBUF_ANY_CONTIGUOUS) - if rc != 0: - raise BufferError() - - # Parse the buffer's format string to get the ArrowType and element size - try: - if buffer.format == NULL: - format = "B" - else: - format = buffer.format.decode("UTF-8") - except Exception as e: - PyBuffer_Release(&buffer) - raise e - - # Transfers ownership of buffer to c_buffer, whose finalizer will be called by - # the capsule when the capsule is deleted or garbage collected - c_buffer[0].data = buffer.buf - c_buffer[0].size_bytes = buffer.len - c_buffer[0].capacity_bytes = 0 - c_buffer[0].allocator = c_pybuffer_deallocator(&buffer) - - # Return the calculated components - return format - - -class NanoarrowException(RuntimeError): - """An error resulting from a call to the nanoarrow C library - - Calls to the nanoarrow C library and/or the Arrow C Stream interface - callbacks return an errno error code and sometimes a message with extra - detail. This exception wraps a RuntimeError to format a suitable message - and store the components of the original error. - """ - - def __init__(self, what, code, message=""): - self.what = what - self.code = code - self.message = message - - if self.message == "": - super().__init__(f"{self.what} failed ({self.code})") - else: - super().__init__(f"{self.what} failed ({self.code}): {self.message}") - - -cdef class Error: - """Memory holder for an ArrowError - - ArrowError is the C struct that is optionally passed to nanoarrow functions - when a detailed error message might be returned. This class holds a C - reference to the object and provides helpers for raising exceptions based - on the contained message. - """ - cdef ArrowError c_error - - def __cinit__(self): - self.c_error.message[0] = 0 - - def raise_message(self, what, code): - """Raise a NanoarrowException from this message - """ - raise NanoarrowException(what, code, self.c_error.message.decode("UTF-8")) - - def raise_message_not_ok(self, what, code): - if code == NANOARROW_OK: - return - self.raise_message(what, code) - - @staticmethod - def raise_error(what, code): - """Raise a NanoarrowException without a message - """ - raise NanoarrowException(what, code, "") - - @staticmethod - def raise_error_not_ok(what, code): - if code == NANOARROW_OK: - return - Error.raise_error(what, code) - - # This could in theory use cpdef enum, but an initial attempt to do so # resulted Cython duplicating some function definitions. For now, we resort # to a more manual trampoline of values to make them accessible from @@ -2845,7 +2515,7 @@ cdef class CArrayBuilder: code = ArrowArrayAppendString(self._ptr, item) if code != NANOARROW_OK: - Error.raise_error(f"append string item {py_item}") + Error.raise_error(f"append string item {py_item}", code) return self @@ -2872,7 +2542,7 @@ cdef class CArrayBuilder: PyBuffer_Release(&buffer) if code != NANOARROW_OK: - Error.raise_error(f"append bytes item {py_item}") + Error.raise_error(f"append bytes item {py_item}", code) def set_offset(self, int64_t offset): self.c_array._assert_valid() diff --git a/python/src/nanoarrow/_utils.pxd b/python/src/nanoarrow/_utils.pxd new file mode 100644 index 000000000..dc2b33a43 --- /dev/null +++ b/python/src/nanoarrow/_utils.pxd @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from nanoarrow_c cimport ( + ArrowSchema, + ArrowArray, + ArrowArrayStream, + ArrowArrayView, + ArrowBuffer, + ArrowError +) +from nanoarrow_device_c cimport ArrowDeviceArray + +cdef object alloc_c_schema(ArrowSchema** c_schema) + +cdef object alloc_c_array(ArrowArray** c_array) + +cdef object alloc_c_array_stream(ArrowArrayStream** c_stream) + +cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array) + +cdef object alloc_c_array_view(ArrowArrayView** c_array_view) + +cdef object alloc_c_buffer(ArrowBuffer** c_buffer) + +cdef void c_array_shallow_copy(object base, const ArrowArray* src, ArrowArray* dst) + +cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray* src, + ArrowDeviceArray* dst) + +cdef object c_buffer_set_pybuffer(object obj, ArrowBuffer** c_buffer) + +cdef class Error: + cdef ArrowError c_error + + cdef raise_message(self, what, code) + + cdef raise_message_not_ok(self, what, code) + + @staticmethod + cdef raise_error(what, code) + + @staticmethod + cdef raise_error_not_ok(what, code) diff --git a/python/src/nanoarrow/_utils.pyx b/python/src/nanoarrow/_utils.pyx new file mode 100644 index 000000000..f9355c8af --- /dev/null +++ b/python/src/nanoarrow/_utils.pyx @@ -0,0 +1,493 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from libc.stdint cimport uint8_t, int64_t +from libc.string cimport memcpy +from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, PyCapsule_IsValid +from cpython cimport ( + Py_buffer, + PyObject_CheckBuffer, + PyBuffer_Release, + PyObject_GetBuffer, + PyBUF_FORMAT, + PyBUF_ANY_CONTIGUOUS +) +from cpython.ref cimport Py_INCREF, Py_DECREF + +from nanoarrow_c cimport ( + ArrowArray, + ArrowArrayAllocateChildren, + ArrowArrayAllocateDictionary, + ArrowArrayBuffer, + ArrowArrayInitFromType, + ArrowArrayMove, + ArrowArrayRelease, + ArrowArrayStream, + ArrowArrayStreamRelease, + ArrowArrayView, + ArrowArrayViewInitFromType, + ArrowArrayViewReset, + ArrowBuffer, + ArrowBufferAllocator, + ArrowBufferDeallocator, + ArrowBufferDeallocatorCallback, + ArrowBufferInit, + ArrowBufferReset, + ArrowFree, + ArrowMalloc, + ArrowNanoarrowVersion, + ArrowSchema, + ArrowSchemaRelease, + NANOARROW_OK, + NANOARROW_TYPE_UNINITIALIZED +) + +from nanoarrow_device_c cimport ( + ArrowDeviceArray +) + +def c_version() -> str: + """Return the nanoarrow C library version string + """ + return ArrowNanoarrowVersion().decode("UTF-8") + + +# CPython utilities that are helpful in Python and not available in all +# implementations of ctypes (e.g., early Python versions, pypy) +def obj_is_capsule(obj, str name) -> bool: + """Check if an object is a PyCapsule + + Provided because this function is not reliably available in all + version of PyPy's ctypes implementation. + + Parameters + ---------- + obj : any + An object to check + name : str + The PyCapule "name" (e.g., "arrow_array") + """ + return PyCapsule_IsValid(obj, name.encode()) == 1 + + +def obj_is_buffer(obj): + """Check if an object implements the Python Buffer protocol + + Provided because this function is not reliably available in all + version of PyPy's ctypes implementation. + + Parameters + ---------- + obj : any + An object to check + """ + return PyObject_CheckBuffer(obj) == 1 + + +class NanoarrowException(RuntimeError): + """An error resulting from a call to the nanoarrow C library + + Calls to the nanoarrow C library and/or the Arrow C Stream interface + callbacks return an errno error code and sometimes a message with extra + detail. This exception wraps a RuntimeError to format a suitable message + and store the components of the original error. + + Parameters + ---------- + what : str + A string describing the context in which the exception was generated. + This is usually the name of a nanoarrow C library function. + code : int + An errno code (e.g., EINVAL) returned by a nanoarrow C library function. + message : str + An optional message (e.g., generated by inspecting an ArrowError). + If not provided, a message will be generated based on ``code`` and ``what``. + """ + + def __init__(self, what, code, message=""): + self.what = what + self.code = code + self.message = message + + if self.message == "": + super().__init__(f"{self.what} failed ({self.code})") + else: + super().__init__(f"{self.what} failed ({self.code}): {self.message}") + +cdef class Error: + """Memory holder for an ArrowError + + ArrowError is the C struct that is optionally passed to nanoarrow functions + when a detailed error message might be returned. This class holds a C + reference to the object and provides helpers for raising exceptions based + on the contained message. + """ + + def __cinit__(self): + self.c_error.message[0] = 0 + + cdef raise_message(self, what, code): + """Raise a :class:`NanoarrowException` from the message held by + the wrapped ArrowError + """ + raise NanoarrowException(what, code, self.c_error.message.decode("UTF-8")) + + cdef raise_message_not_ok(self, what, code): + """Call :meth:`raise_message` if code it not NANOARROW_OK""" + if code == NANOARROW_OK: + return + self.raise_message(what, code) + + @staticmethod + cdef raise_error(what, code): + """Raise a :class:`NanoarrowException` without a message + """ + raise NanoarrowException(what, code, "") + + @staticmethod + cdef raise_error_not_ok(what, code): + """Call :meth:`raise_error_not_ok` if code it not NANOARROW_OK""" + if code == NANOARROW_OK: + return + Error.raise_error(what, code) + + +cdef void pycapsule_schema_deleter(object schema_capsule) noexcept: + """Finalize an ArrowSchema capsule + + Calls the ArrowSchema's release callback if the callback is non-null + and frees the memory for the pointed-to ``struct ArrowSchema``. + """ + cdef ArrowSchema* schema = PyCapsule_GetPointer( + schema_capsule, 'arrow_schema' + ) + if schema.release != NULL: + ArrowSchemaRelease(schema) + + ArrowFree(schema) + + +cdef object alloc_c_schema(ArrowSchema** c_schema): + """Allocate an ArrowSchema and wrap it in a PyCapsule""" + c_schema[0] = ArrowMalloc(sizeof(ArrowSchema)) + # Ensure the capsule destructor doesn't call a random release pointer + c_schema[0].release = NULL + return PyCapsule_New(c_schema[0], 'arrow_schema', &pycapsule_schema_deleter) + + +cdef void pycapsule_array_deleter(object array_capsule) noexcept: + """Finalize an ArrowArray capsule + + Calls the ArrowArray's release callback if the callback is non-null + and frees the memory for the pointed-to ``struct ArrowArray``. + """ + cdef ArrowArray* array = PyCapsule_GetPointer( + array_capsule, 'arrow_array' + ) + # Do not invoke the deleter on a used/moved capsule + if array.release != NULL: + ArrowArrayRelease(array) + + ArrowFree(array) + + +cdef object alloc_c_array(ArrowArray** c_array): + """Allocate an ArrowArray and wrap it in a PyCapsule""" + c_array[0] = ArrowMalloc(sizeof(ArrowArray)) + # Ensure the capsule destructor doesn't call a random release pointer + c_array[0].release = NULL + return PyCapsule_New(c_array[0], 'arrow_array', &pycapsule_array_deleter) + + +cdef void pycapsule_array_stream_deleter(object stream_capsule) noexcept: + """Finalize an ArrowArrayStream capsule + + Calls the ArrowArrayStream's release callback if the callback is non-null + and frees the memory for the pointed-to ``struct ArrowArrayStream``. + """ + cdef ArrowArrayStream* stream = PyCapsule_GetPointer( + stream_capsule, 'arrow_array_stream' + ) + # Do not invoke the deleter on a used/moved capsule + if stream.release != NULL: + ArrowArrayStreamRelease(stream) + + ArrowFree(stream) + + +cdef object alloc_c_array_stream(ArrowArrayStream** c_stream): + """Allocate an ArrowArrayStream and wrap it in a PyCapsule""" + c_stream[0] = ArrowMalloc(sizeof(ArrowArrayStream)) + # Ensure the capsule destructor doesn't call a random release pointer + c_stream[0].release = NULL + return PyCapsule_New(c_stream[0], 'arrow_array_stream', &pycapsule_array_stream_deleter) + + +cdef void pycapsule_device_array_deleter(object device_array_capsule) noexcept: + """Finalize an ArrowDeviceArray capsule + + Calls the ``array`` member's release callback if the callback is non-null + and frees the memory for the pointed-to ``struct ArrowDeviceArray``. + """ + cdef ArrowDeviceArray* device_array = PyCapsule_GetPointer( + device_array_capsule, 'arrow_device_array' + ) + # Do not invoke the deleter on a used/moved capsule + if device_array.array.release != NULL: + device_array.array.release(&device_array.array) + + ArrowFree(device_array) + + +cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array): + """Allocate an ArrowDeviceArray and wrap it in a PyCapsule""" + c_device_array[0] = ArrowMalloc(sizeof(ArrowDeviceArray)) + # Ensure the capsule destructor doesn't call a random release pointer + c_device_array[0].array.release = NULL + return PyCapsule_New(c_device_array[0], 'arrow_device_array', &pycapsule_device_array_deleter) + + +cdef void pycapsule_array_view_deleter(object array_capsule) noexcept: + """Finalize an ArrowArrayView capsule + + Calls ``ArrowArrayViewReset()`` on the pointed to ``struct ArrowArrayView`` + and frees the memory associated with the pointer. + """ + cdef ArrowArrayView* array_view = PyCapsule_GetPointer( + array_capsule, 'nanoarrow_array_view' + ) + + ArrowArrayViewReset(array_view) + + ArrowFree(array_view) + + +cdef object alloc_c_array_view(ArrowArrayView** c_array_view): + """Allocate an ArrowArrayView and wrap it in a PyCapsule""" + c_array_view[0] = ArrowMalloc(sizeof(ArrowArrayView)) + ArrowArrayViewInitFromType(c_array_view[0], NANOARROW_TYPE_UNINITIALIZED) + return PyCapsule_New(c_array_view[0], 'nanoarrow_array_view', &pycapsule_array_view_deleter) + + +# Provide a way to validate that we release all references we create +cdef int64_t pyobject_buffer_count = 0 + +def get_pyobject_buffer_count(): + """Get the current borrowed ArrowBuffer count + + Returns a count of Py_INCREF calls where Py_DECREF has not yet been + called on an ArrowBuffer borrowed from a Python object. This function is + used to test shallow copy behaviour for leaked PyObject references. + """ + global pyobject_buffer_count + return pyobject_buffer_count + + +cdef void c_deallocate_pyobject_buffer(ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t size) noexcept with gil: + """ArrowBufferDeallocatorCallback for an ArrowBuffer borrwed from a PyObject + """ + Py_DECREF(allocator.private_data) + + global pyobject_buffer_count + pyobject_buffer_count -= 1 + + +cdef void c_pyobject_buffer(object base, const void* buf, int64_t size_bytes, ArrowBuffer* out): + """Borrow an ArrowBuffer from base + + This function populates ``out`` with an ``ArrowBuffer`` whose allocator has been + populated using ``ArrowBufferDeallocator()`` in such a way that ``Py_INCREF`` is + invoked on base when the buffer is created and ``Py_DECREF`` is invoked on base + when the buffer is destroyed using ``ArrowBufferReset()``. The net incref/decref + count can be checked with :func:`get_pyobject_buffer_count`. + """ + out.data = buf + out.size_bytes = size_bytes + out.allocator = ArrowBufferDeallocator( + c_deallocate_pyobject_buffer, + base + ) + Py_INCREF(base) + + global pyobject_buffer_count + pyobject_buffer_count += 1 + + +cdef void c_array_shallow_copy(object base, const ArrowArray* src, ArrowArray* dst): + """Make the shallowest possible (safe) copy of an ArrowArray + + Once a CArray exists at the Python level, nanoarrow makes it very difficult + to perform an operation that might render the pointed-to ArrowArray invalid. + Performing a deep copy (i.e., copying buffer content) would be unexpected and + prohibitively expensive, and performing a truly shallow copy (i.e., adding + an ArrowArray implementation that simply PyINCREF/PyDECREFs the original array) + is not safe because the Arrow C Data interface specification allows children + to be "move"d. Even though nanoarrow's Python bindings do not do this unless + explicitly requested, when passed to some other library they are free to do so. + + This implementation of a shallow copy creates a recursive copy of the original + array, including any children and dictionary (if present). It uses the + C library's ArrowArray implementation, which takes care of releasing children, + and allows us to use the ArrowBufferDeallocator mechanism to add/remove + references to the appropriate PyObject. + """ + # Allocate an ArrowArray* that will definitely be cleaned up should an exception + # be raised in the process of shallow copying its contents + cdef ArrowArray* tmp + shelter = alloc_c_array(&tmp) + cdef int code + + code = ArrowArrayInitFromType(tmp, NANOARROW_TYPE_UNINITIALIZED) + Error.raise_error_not_ok("ArrowArrayInitFromType()", code) + + # Copy data for this array, adding a reference for each buffer + # This allows us to use the nanoarrow C library's ArrowArray + # implementation without writing our own release callbacks/private_data. + tmp.length = src.length + tmp.offset = src.offset + tmp.null_count = src.null_count + + for i in range(src.n_buffers): + if src.buffers[i] != NULL: + # The purpose of this buffer is soley so that we can use the + # ArrowBufferDeallocator mechanism to add a reference to base. + # The ArrowArray release callback that exists here after + # because of ArrowArrayInitFromType() will call ArrowBufferReset() + # on any buffer that was injected in this way (and thus release the + # reference to base). We don't actually know the size of the buffer + # (and our release callback doesn't use it), so it is set to 0. + c_pyobject_buffer(base, src.buffers[i], 0, ArrowArrayBuffer(tmp, i)) + + # The actual pointer value is tracked separately from the ArrowBuffer + # (which is only concerned with object lifecycle). + tmp.buffers[i] = src.buffers[i] + + tmp.n_buffers = src.n_buffers + + # Recursive shallow copy children + if src.n_children > 0: + code = ArrowArrayAllocateChildren(tmp, src.n_children) + Error.raise_error_not_ok("ArrowArrayAllocateChildren()", code) + + for i in range(src.n_children): + c_array_shallow_copy(base, src.children[i], tmp.children[i]) + + # Recursive shallow copy dictionary + if src.dictionary != NULL: + code = ArrowArrayAllocateDictionary(tmp) + Error.raise_error_not_ok("ArrowArrayAllocateDictionary()", code) + + c_array_shallow_copy(base, src.dictionary, tmp.dictionary) + + # Move tmp into dst + ArrowArrayMove(tmp, dst) + + +cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray* src, + ArrowDeviceArray* dst) noexcept: + """Make the shallowest possible (safe) copy of an ArrowDeviceArray + + Wraps :func:`c_array_shallow_copy` to extend its functionality to the + ArrowDeviceArray. It is currently not clear how or if this should + be possible and how or if it should interact with the device's + sync event. + """ + # Copy top-level information but leave the array marked as released + # TODO: Should the sync event be copied here too? + memcpy(dst, src, sizeof(ArrowDeviceArray)) + dst.array.release = NULL + + # Shallow copy the array + c_array_shallow_copy(base, &src.array, &dst.array) + + +cdef void pycapsule_buffer_deleter(object stream_capsule) noexcept: + """Finalize an ArrowBuffer capsule + + Calls ``ArrowBufferReset()`` on the pointed to ``struct ArrowBuffer`` + and frees the memory associated with the pointer. + """ + cdef ArrowBuffer* buffer = PyCapsule_GetPointer( + stream_capsule, 'nanoarrow_buffer' + ) + + ArrowBufferReset(buffer) + ArrowFree(buffer) + + +cdef object alloc_c_buffer(ArrowBuffer** c_buffer): + """Allocate an ArrowBuffer and wrap it in a PyCapsule""" + c_buffer[0] = ArrowMalloc(sizeof(ArrowBuffer)) + ArrowBufferInit(c_buffer[0]) + return PyCapsule_New(c_buffer[0], 'nanoarrow_buffer', &pycapsule_buffer_deleter) + + +cdef void c_deallocate_pybuffer(ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t size) noexcept with gil: + """ArrowBufferDeallocatorCallback for an ArrowBuffer wrapping a Py_Buffer""" + cdef Py_buffer* buffer = allocator.private_data + PyBuffer_Release(buffer) + ArrowFree(buffer) + + +cdef ArrowBufferAllocator c_pybuffer_deallocator(Py_buffer* buffer): + """ArrowBufferAllocator implementation wrapping a reference to a Py_Buffer""" + cdef Py_buffer* allocator_private = ArrowMalloc(sizeof(Py_buffer)) + if allocator_private == NULL: + PyBuffer_Release(buffer) + raise MemoryError() + + memcpy(allocator_private, buffer, sizeof(Py_buffer)) + return ArrowBufferDeallocator( + &c_deallocate_pybuffer, + allocator_private + ) + + +cdef object c_buffer_set_pybuffer(object obj, ArrowBuffer** c_buffer): + """Manage a Py_Buffer reference as an ArrowBuffer + + Populates ``c_buffer`` with an ``ArrowBuffer`` whose allocator has + been set such that when ``ArrowBufferReset()`` is invoked, + ``PyBuffer_Release()`` will be called on the buffer that was obtained + from ``obj`` using ``PyObject_GetBuffer()``. + """ + ArrowBufferReset(c_buffer[0]) + + cdef Py_buffer buffer + cdef int rc = PyObject_GetBuffer(obj, &buffer, PyBUF_FORMAT | PyBUF_ANY_CONTIGUOUS) + if rc != 0: + raise BufferError() + + # Return buffer's format string so that the caller can calculate its ArrowType + try: + if buffer.format == NULL: + format = "B" + else: + format = buffer.format.decode("UTF-8") + except Exception as e: + PyBuffer_Release(&buffer) + raise e + + # Transfers ownership of buffer to c_buffer, whose finalizer will be called by + # the capsule when the capsule is deleted or garbage collected + c_buffer[0].data = buffer.buf + c_buffer[0].size_bytes = buffer.len + c_buffer[0].capacity_bytes = 0 + c_buffer[0].allocator = c_pybuffer_deallocator(&buffer) + + # Return the calculated components + return format diff --git a/python/src/nanoarrow/c_array.py b/python/src/nanoarrow/c_array.py index db33c22ef..1c3e676c0 100644 --- a/python/src/nanoarrow/c_array.py +++ b/python/src/nanoarrow/c_array.py @@ -27,9 +27,8 @@ CSchema, CSchemaBuilder, NoneAwareWrapperIterator, - _obj_is_buffer, - _obj_is_capsule, ) +from nanoarrow._utils import obj_is_buffer, obj_is_capsule from nanoarrow.c_buffer import c_buffer from nanoarrow.c_schema import c_schema, c_schema_view @@ -97,7 +96,7 @@ def c_array(obj, schema=None) -> CArray: ) # Try import of bare capsule - if _obj_is_capsule(obj, "arrow_array"): + if obj_is_capsule(obj, "arrow_array"): if schema is None: schema_capsule = CSchema.allocate()._capsule else: @@ -138,7 +137,7 @@ def _resolve_builder(obj): if _obj_is_empty(obj): return EmptyArrayBuilder - if _obj_is_buffer(obj): + if obj_is_buffer(obj): return ArrayFromPyBufferBuilder if _obj_is_iterable(obj): diff --git a/python/src/nanoarrow/c_array_stream.py b/python/src/nanoarrow/c_array_stream.py index 411f41fdb..ad1fd9812 100644 --- a/python/src/nanoarrow/c_array_stream.py +++ b/python/src/nanoarrow/c_array_stream.py @@ -15,7 +15,8 @@ # specific language governing permissions and limitations # under the License. -from nanoarrow._lib import CArrayStream, _obj_is_capsule +from nanoarrow._lib import CArrayStream +from nanoarrow._utils import obj_is_capsule from nanoarrow.c_array import c_array from nanoarrow.c_schema import c_schema @@ -73,7 +74,7 @@ def c_array_stream(obj=None, schema=None) -> CArrayStream: ) # Try import of bare capsule - if _obj_is_capsule(obj, "arrow_array_stream"): + if obj_is_capsule(obj, "arrow_array_stream"): if schema is not None: raise TypeError( "Can't import c_array_stream from capsule with requested schema" diff --git a/python/src/nanoarrow/c_buffer.py b/python/src/nanoarrow/c_buffer.py index 814a5a760..78ccdfa25 100644 --- a/python/src/nanoarrow/c_buffer.py +++ b/python/src/nanoarrow/c_buffer.py @@ -15,7 +15,8 @@ # specific language governing permissions and limitations # under the License. -from nanoarrow._lib import CArrowType, CBuffer, CBufferBuilder, _obj_is_buffer +from nanoarrow._lib import CArrowType, CBuffer, CBufferBuilder +from nanoarrow._utils import obj_is_buffer from nanoarrow.c_schema import c_schema_view @@ -61,7 +62,7 @@ def c_buffer(obj, schema=None) -> CBuffer: if isinstance(obj, CBuffer) and schema is None: return obj - if _obj_is_buffer(obj): + if obj_is_buffer(obj): if schema is not None: raise NotImplementedError( "c_buffer() with schema for pybuffer is not implemented" diff --git a/python/src/nanoarrow/c_schema.py b/python/src/nanoarrow/c_schema.py index a13477116..2312b33e2 100644 --- a/python/src/nanoarrow/c_schema.py +++ b/python/src/nanoarrow/c_schema.py @@ -26,7 +26,8 @@ """ -from nanoarrow._lib import CSchema, CSchemaView, _obj_is_capsule +from nanoarrow._lib import CSchema, CSchemaView +from nanoarrow._utils import obj_is_capsule def c_schema(obj=None) -> CSchema: @@ -65,7 +66,7 @@ def c_schema(obj=None) -> CSchema: if hasattr(obj, "__arrow_c_schema__"): return CSchema._import_from_c_capsule(obj.__arrow_c_schema__()) - if _obj_is_capsule(obj, "arrow_schema"): + if obj_is_capsule(obj, "arrow_schema"): return CSchema._import_from_c_capsule(obj) # for pyarrow < 14.0 diff --git a/python/src/nanoarrow/ipc.py b/python/src/nanoarrow/ipc.py index 794ecf360..86d141f35 100644 --- a/python/src/nanoarrow/ipc.py +++ b/python/src/nanoarrow/ipc.py @@ -18,7 +18,8 @@ import io from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream -from nanoarrow._lib import CArrayStream, _obj_is_buffer +from nanoarrow._lib import CArrayStream +from nanoarrow._utils import obj_is_buffer from nanoarrow import _repr_utils @@ -99,7 +100,7 @@ def from_readable(obj): - get_schema(): struct """ - if not hasattr(obj, "readinto") and _obj_is_buffer(obj): + if not hasattr(obj, "readinto") and obj_is_buffer(obj): close_obj = True obj = io.BytesIO(obj) else: diff --git a/python/tests/test_c_array.py b/python/tests/test_c_array.py index 6df964725..0a74c756e 100644 --- a/python/tests/test_c_array.py +++ b/python/tests/test_c_array.py @@ -19,7 +19,8 @@ from datetime import date, datetime, timezone import pytest -from nanoarrow._lib import CArrayBuilder, NanoarrowException +from nanoarrow._lib import CArrayBuilder +from nanoarrow._utils import NanoarrowException from nanoarrow.c_schema import c_schema_view import nanoarrow as na @@ -148,7 +149,7 @@ def test_c_array_shallow_copy(): import gc import platform - from nanoarrow._lib import get_pyobject_buffer_count + from nanoarrow._utils import get_pyobject_buffer_count if platform.python_implementation() == "PyPy": pytest.skip( diff --git a/python/tests/test_dlpack.py b/python/tests/test_dlpack.py index 551057638..03e379f8a 100644 --- a/python/tests/test_dlpack.py +++ b/python/tests/test_dlpack.py @@ -16,7 +16,7 @@ # under the License. import pytest -from nanoarrow._lib import _obj_is_capsule +from nanoarrow._utils import obj_is_capsule import nanoarrow as na @@ -25,7 +25,7 @@ def check_dlpack_export(view, expected_arr): DLTensor = view.__dlpack__() - assert _obj_is_capsule(DLTensor, "dltensor") is True + assert obj_is_capsule(DLTensor, "dltensor") is True result = np.from_dlpack(view) np.testing.assert_array_equal(result, expected_arr, strict=True) diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py index b9d15321a..e41db8380 100644 --- a/python/tests/test_ipc.py +++ b/python/tests/test_ipc.py @@ -21,7 +21,7 @@ import tempfile import pytest -from nanoarrow._lib import NanoarrowException +from nanoarrow._utils import NanoarrowException from nanoarrow.ipc import Stream import nanoarrow as na