diff --git a/python/src/nanoarrow/_ipc_lib.pyx b/python/src/nanoarrow/_ipc_lib.pyx index ea15921b8..615ee10c3 100644 --- a/python/src/nanoarrow/_ipc_lib.pyx +++ b/python/src/nanoarrow/_ipc_lib.pyx @@ -63,6 +63,10 @@ cdef class PyInputStreamPrivate: self.size_bytes = 0 self.close_stream = close_stream + # Needed for at least some implementations of readinto() + def __len__(self): + return self.size_bytes + # Implement the buffer protocol so that this object can be used as # the argument to xxx.readinto(). This ensures that no extra copies # (beyond any buffering done by the upstream file-like object) are held diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx index e1307d0bc..2877feca5 100644 --- a/python/src/nanoarrow/_lib.pyx +++ b/python/src/nanoarrow/_lib.pyx @@ -766,6 +766,10 @@ cdef class CSchemaView: self._nullable = schema._ptr.flags & ARROW_FLAG_NULLABLE self._map_keys_sorted = schema._ptr.flags & ARROW_FLAG_MAP_KEYS_SORTED + @property + def layout(self): + return CLayout(self, &self._schema_view.layout) + @property def type_id(self): return self._schema_view.type @@ -788,7 +792,8 @@ cdef class CSchemaView: @property def dictionary_ordered(self): - return self._dictionary_ordered != 0 + if self._schema_view.type == NANOARROW_TYPE_DICTIONARY: + return self._dictionary_ordered != 0 @property def nullable(self): @@ -796,7 +801,8 @@ cdef class CSchemaView: @property def map_keys_sorted(self): - return self._map_keys_sorted != 0 + if self._schema_view.type == NANOARROW_TYPE_MAP: + return self._map_keys_sorted != 0 @property def fixed_size(self): @@ -861,6 +867,38 @@ cdef class CSchemaView: return _repr_utils.schema_view_repr(self) +cdef class CLayout: + cdef ArrowLayout* _layout + cdef object _base + cdef int _n_buffers + + def __cinit__(self, base, uintptr_t ptr): + self._base = base + self._layout = ptr + + self._n_buffers = NANOARROW_MAX_FIXED_BUFFERS + for i in range(NANOARROW_MAX_FIXED_BUFFERS): + if self._layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE: + self._n_buffers = i + break + + @property + def n_buffers(self): + return self._n_buffers + + @property + def buffer_data_type_id(self): + return tuple(self._layout.buffer_data_type[i] for i in range(self._n_buffers)) + + @property + def element_size_bits(self): + return tuple(self._layout.element_size_bits[i] for i in range(self._n_buffers)) + + @property + def child_size_elements(self): + return self._layout.child_size_elements + + cdef class CSchemaBuilder: cdef CSchema c_schema cdef ArrowSchema* _ptr @@ -1027,6 +1065,36 @@ cdef class CArray: return out + def __getitem__(self, k): + if not isinstance(k, slice): + raise TypeError( + f"Can't slice CArray with object of type {type(k).__name__}" + ) + + if k.step is not None: + raise ValueError("Can't slice CArray with step") + + cdef int64_t start = 0 if k.start is None else k.start + cdef int64_t stop = self._ptr.length if k.stop is None else k.stop + if start < 0: + start = self.length + start + if stop < 0: + stop = self.length + stop + + if start > self._ptr.length or stop > self._ptr.length or stop < start: + raise IndexError( + f"{k} does not describe a valid slice of CArray " + f"with length {self._ptr.length}" + ) + + cdef ArrowArray* c_array_out + base = alloc_c_array(&c_array_out) + c_array_shallow_copy(self._base, self._ptr, c_array_out) + + c_array_out.offset = c_array_out.offset + start + c_array_out.length = stop - start + return CArray(base, c_array_out, self._schema) + def __arrow_c_array__(self, requested_schema=None): """ Get a pair of PyCapsules containing a C ArrowArray representation of the object. @@ -1134,6 +1202,7 @@ cdef class CArrayView: See `nanoarrow.c_array_view()` for construction and usage examples. """ cdef object _base + cdef object _array_base cdef ArrowArrayView* _ptr cdef CDevice _device @@ -1142,12 +1211,34 @@ cdef class CArrayView: self._ptr = addr self._device = CDEVICE_CPU + def _set_array(self, CArray array, CDevice device=CDEVICE_CPU): + cdef Error error = Error() + cdef int code + + if device is CDEVICE_CPU: + code = ArrowArrayViewSetArray(self._ptr, array._ptr, &error.c_error) + else: + code = ArrowArrayViewSetArrayMinimal(self._ptr, array._ptr, &error.c_error) + + error.raise_message_not_ok("ArrowArrayViewSetArray()", code) + self._array_base = array._base + self._device = device + return self + + @property + def storage_type_id(self): + return self._ptr.storage_type + @property def storage_type(self): cdef const char* type_str = ArrowTypeString(self._ptr.storage_type) if type_str != NULL: return type_str.decode('UTF-8') + @property + def layout(self): + return CLayout(self, &self._ptr.layout) + @property def length(self): return self._ptr.length @@ -1183,10 +1274,7 @@ cdef class CArrayView: @property def n_buffers(self): - for i in range(3): - if self._ptr.layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE: - return i - return 3 + return self.layout.n_buffers def buffer_type(self, int64_t i): if i < 0 or i >= self.n_buffers: @@ -1220,7 +1308,7 @@ cdef class CArrayView: raise RuntimeError(f"ArrowArrayView buffer {i} has size_bytes < 0") return CBufferView( - self._base, + self._array_base, buffer_view.data.data, buffer_view.size_bytes, self._ptr.layout.buffer_data_type[i], @@ -1247,19 +1335,21 @@ cdef class CArrayView: return _repr_utils.array_view_repr(self) @staticmethod - def from_cpu_array(CArray array): + def from_schema(CSchema schema): cdef ArrowArrayView* c_array_view base = alloc_c_array_view(&c_array_view) cdef Error error = Error() cdef int code = ArrowArrayViewInitFromSchema(c_array_view, - array._schema._ptr, &error.c_error) + schema._ptr, &error.c_error) error.raise_message_not_ok("ArrowArrayViewInitFromSchema()", code) - code = ArrowArrayViewSetArray(c_array_view, array._ptr, &error.c_error) - error.raise_message_not_ok("ArrowArrayViewSetArray()", code) + return CArrayView(base, c_array_view) - return CArrayView((base, array), c_array_view) + @staticmethod + def from_array(CArray array, CDevice device=CDEVICE_CPU): + out = CArrayView.from_schema(array._schema) + return out._set_array(array, device) cdef class SchemaMetadata: @@ -1373,6 +1463,14 @@ cdef class CBufferView: return value def __iter__(self): + return self._iter_dispatch(0, len(self)) + + def _iter_dispatch(self, int64_t offset, int64_t length): + if offset < 0 or length < 0 or (offset + length) > len(self): + raise IndexError( + f"offset {offset} and length {length} do not describe a valid slice " + f"of buffer with length {len(self)}" + ) # memoryview's implementation is very fast but not always possible (half float, fixed-size binary, interval) if self._data_type in ( NANOARROW_TYPE_HALF_FLOAT, @@ -1383,11 +1481,14 @@ cdef class CBufferView: ) or ( self._data_type == NANOARROW_TYPE_BINARY and self._element_size_bits != 0 ): - return self._iter_struct() + return self._iter_struct(offset, length) else: - return iter(memoryview(self)) + return self._iter_memoryview(offset, length) - def _iter_struct(self): + def _iter_memoryview(self, int64_t offset, int64_t length): + return iter(memoryview(self)[offset:(offset + length)]) + + def _iter_struct(self, int64_t offset, int64_t length): for value in iter_unpack(self.format, self): if len(value) == 1: yield value[0] @@ -1409,19 +1510,41 @@ cdef class CBufferView: else: return self[i] - @property - def elements(self): + def elements(self, offset=0, length=None): + if length is None: + length = self.n_elements + + if offset < 0 or length < 0 or (offset + length) > self.n_elements: + raise IndexError( + f"offset {offset} and length {length} do not describe a valid slice " + f"of buffer with {self.n_elements} elements" + ) + if self._data_type == NANOARROW_TYPE_BOOL: - return self._iter_bitmap() + return self._iter_bitmap(offset, length) else: - return self.__iter__() + return self._iter_dispatch(offset, length) - def _iter_bitmap(self): + def _iter_bitmap(self, int64_t offset, int64_t length): cdef uint8_t item - for i in range(self._shape): - item = self._ptr.data.as_uint8[i] - for j in range(8): - yield (item & (1 << j)) != 0 + cdef int64_t i + + if offset % 8 == 0: + first_byte = offset // 8 + last_byte = self._shape + i = 0 + + for byte_i in range(first_byte, last_byte): + item = self._ptr.data.as_uint8[byte_i] + for j in range(8): + yield (item & (1 << j)) != 0 + i += 1 + if i >= length: + return + else: + for i in range(length): + yield ArrowBitGet(self._ptr.data.as_uint8, offset + i) != 0 + cdef Py_ssize_t _item_size(self): if self._element_size_bits < 8: @@ -1611,10 +1734,9 @@ cdef class CBuffer: self._assert_valid() return self._view.element(i) - @property - def elements(self): + def elements(self, offset=0, length=None): self._assert_valid() - return self._view.elements + return self._view.elements(offset, length) def __getbuffer__(self, Py_buffer* buffer, int flags): self._assert_valid() @@ -1921,16 +2043,44 @@ cdef class CArrayStream: cdef ArrowArrayStream* _ptr cdef object _cached_schema + def __cinit__(self, object base, uintptr_t addr): + self._base = base + self._ptr = addr + self._cached_schema = None + @staticmethod def allocate(): cdef ArrowArrayStream* c_array_stream_out base = alloc_c_array_stream(&c_array_stream_out) return CArrayStream(base, c_array_stream_out) - def __cinit__(self, object base, uintptr_t addr): - self._base = base - self._ptr = addr - self._cached_schema = None + @staticmethod + def from_array_list(arrays, CSchema schema, move=False, validate=True): + cdef ArrowArrayStream* c_array_stream_out + base = alloc_c_array_stream(&c_array_stream_out) + + if not move: + schema = schema.__deepcopy__() + + cdef int code = ArrowBasicArrayStreamInit(c_array_stream_out, schema._ptr, len(arrays)) + Error.raise_error_not_ok("ArrowBasicArrayStreamInit()", code) + + cdef ArrowArray tmp + cdef CArray array + for i in range(len(arrays)): + array = arrays[i] + if not move: + c_array_shallow_copy(array._base, array._ptr, &tmp) + ArrowBasicArrayStreamSetArray(c_array_stream_out, i, &tmp) + else: + ArrowBasicArrayStreamSetArray(c_array_stream_out, i, array._ptr) + + cdef Error error = Error() + if validate: + code = ArrowBasicArrayStreamValidate(c_array_stream_out, &error.c_error) + error.raise_message_not_ok("ArrowBasicArrayStreamValidate()", code) + + return CArrayStream(base, c_array_stream_out) def release(self): if self.is_valid(): diff --git a/python/src/nanoarrow/c_lib.py b/python/src/nanoarrow/c_lib.py index 20af4678b..68a53b782 100644 --- a/python/src/nanoarrow/c_lib.py +++ b/python/src/nanoarrow/c_lib.py @@ -338,22 +338,35 @@ def c_array_stream(obj=None, schema=None) -> CArrayStream: if isinstance(obj, CArrayStream) and schema is None: return obj + # Try capsule protocol if hasattr(obj, "__arrow_c_stream__"): schema_capsule = None if schema is None else schema.__arrow_c_schema__() return CArrayStream._import_from_c_capsule( obj.__arrow_c_stream__(requested_schema=schema_capsule) ) - # for pyarrow < 14.0 - if hasattr(obj, "_export_to_c"): + # Try import of bare capsule + if _obj_is_capsule(obj, "arrow_array_stream"): + if schema is not None: + raise TypeError( + "Can't import c_array_stream from capsule with requested schema" + ) + return CArrayStream._import_from_c_capsule(obj) + + # Try _export_to_c for RecordBatchReader objects if pyarrow < 14.0 + if _obj_is_pyarrow_record_batch_reader(obj): out = CArrayStream.allocate() obj._export_to_c(out._addr()) return out - else: + + try: + array = c_array(obj, schema=schema) + return CArrayStream.from_array_list([array], array.schema, validate=False) + except Exception as e: raise TypeError( f"Can't convert object of type {type(obj).__name__} " - "to nanoarrow.c_array_stream" - ) + "to nanoarrow.c_array_stream or nanoarrow.c_array" + ) from e def c_schema_view(obj) -> CSchemaView: @@ -414,7 +427,7 @@ def c_array_view(obj, schema=None) -> CArrayView: if isinstance(obj, CArrayView) and schema is None: return obj - return CArrayView.from_cpu_array(c_array(obj, schema)) + return CArrayView.from_array(c_array(obj, schema)) def c_buffer(obj, schema=None) -> CBuffer: @@ -538,6 +551,17 @@ def _obj_is_pyarrow_array(obj): return hasattr(obj, "_export_to_c") +def _obj_is_pyarrow_record_batch_reader(obj): + obj_type = type(obj) + if not obj_type.__module__.startswith("pyarrow"): + return False + + if not obj_type.__name__.endswith("RecordBatchReader"): + return False + + return hasattr(obj, "_export_to_c") + + def _obj_is_iterable(obj): return hasattr(obj, "__iter__") diff --git a/python/src/nanoarrow/iterator.py b/python/src/nanoarrow/iterator.py new file mode 100644 index 000000000..4b13bc764 --- /dev/null +++ b/python/src/nanoarrow/iterator.py @@ -0,0 +1,314 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from functools import cached_property +from itertools import islice +from typing import Iterable, Tuple + +from nanoarrow.c_lib import ( + CArrayView, + CArrowType, + c_array_stream, + c_schema, + c_schema_view, +) + + +def iterator(obj, schema=None) -> Iterable: + """Iterate over items in zero or more arrays + + Returns an iterator over an array stream where each item is a + Python representation of the next element. + + Paramters + --------- + obj : array stream-like + An array-like or array stream-like object as sanitized by + :func:`c_array_stream`. + schema : schema-like, optional + An optional schema, passed to :func:`c_array_stream`. + + Examples + -------- + + >>> import nanoarrow as na + >>> from nanoarrow import iterator + >>> array = na.c_array([1, 2, 3], na.int32()) + >>> list(iterator.iterator(array)) + [1, 2, 3] + """ + return RowIterator.get_iterator(obj, schema=schema) + + +def itertuples(obj, schema=None) -> Iterable[Tuple]: + """Iterate over rows in zero or more struct arrays + + Returns an iterator over an array stream of struct arrays (i.e., + record batches) where each item is a tuple of the items in each + row. This is different than :func:`iterator`, which encodes struct + columns as dictionaries. + + Paramters + --------- + obj : array stream-like + An array-like or array stream-like object as sanitized by + :func:`c_array_stream`. + schema : schema-like, optional + An optional schema, passed to :func:`c_array_stream`. + + Examples + -------- + + >>> import nanoarrow as na + >>> from nanoarrow import iterator + >>> import pyarrow as pa + >>> array = pa.record_batch([pa.array([1, 2, 3])], names=["col1"]) + >>> list(iterator.itertuples(array)) + [(1,), (2,), (3,)] + """ + return RowTupleIterator.get_iterator(obj, schema=schema) + + +class ArrayViewIterator: + """Base class for iterators that use an internal ArrowArrayStream + as the basis for conversion to Python objects. Intended for internal use. + """ + + def __init__(self, schema, *, _array_view=None): + self._schema = c_schema(schema) + self._schema_view = c_schema_view(schema) + + if _array_view is None: + self._array_view = CArrayView.from_schema(self._schema) + else: + self._array_view = _array_view + + self._children = list( + map(self._make_child, self._schema.children, self._array_view.children) + ) + + if schema.dictionary is None: + self._dictionary = None + else: + self._dictionary = self._make_child( + self._schema.dictionary, self._array_view.dictionary + ) + + def _make_child(self, schema, array_view): + return type(self)(schema, _array_view=array_view) + + @cached_property + def _child_names(self): + return [child.name for child in self._schema.children] + + def _contains_nulls(self): + return ( + self._schema_view.nullable + and len(self._array_view.buffer(0)) + and self._array_view.null_count != 0 + ) + + def _set_array(self, array): + self._array_view._set_array(array) + return self + + +class RowIterator(ArrayViewIterator): + """Iterate over the Python object version of values in an ArrowArrayView. + Intended for internal use. + """ + + @classmethod + def get_iterator(cls, obj, schema=None): + with c_array_stream(obj, schema=schema) as stream: + iterator = cls(stream._get_cached_schema()) + for array in stream: + iterator._set_array(array) + yield from iterator._iter1(0, array.length) + + def _iter1(self, offset, length): + type_id = self._schema_view.type_id + if type_id not in _ITEMS_ITER_LOOKUP: + raise KeyError(f"Can't resolve iterator for type '{self.schema_view.type}'") + + factory = getattr(self, _ITEMS_ITER_LOOKUP[type_id]) + return factory(offset, length) + + def _dictionary_iter(self, offset, length): + dictionary = list( + self._dictionary._iter1(0, self._dictionary._array_view.length) + ) + for dict_index in self._primitive_iter(offset, length): + yield None if dict_index is None else dictionary[dict_index] + + def _wrap_iter_nullable(self, validity, items): + for is_valid, item in zip(validity, items): + yield item if is_valid else None + + def _struct_tuple_iter(self, offset, length): + view = self._array_view + offset += view.offset + items = zip(*(child._iter1(offset, length) for child in self._children)) + + if self._contains_nulls(): + validity = view.buffer(0).elements(offset, length) + return self._wrap_iter_nullable(validity, items) + else: + return items + + def _struct_iter(self, offset, length): + names = self._child_names + for item in self._struct_tuple_iter(offset, length): + yield None if item is None else {key: val for key, val in zip(names, item)} + + def _list_iter(self, offset, length): + view = self._array_view + offset += view.offset + + offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)] + starts = offsets[:-1] + ends = offsets[1:] + child = self._children[0] + child_iter = child._iter1(starts[0], ends[-1] - starts[0]) + + if self._contains_nulls(): + validity = view.buffer(0).elements(offset, length) + for is_valid, start, end in zip(validity, starts, ends): + item = list(islice(child_iter, end - start)) + yield item if is_valid else None + else: + for start, end in zip(starts, ends): + yield list(islice(child_iter, end - start)) + + def _fixed_size_list_iter(self, offset, length): + view = self._array_view + offset += view.offset + child = self._children[0] + fixed_size = view.layout.child_size_elements + child_iter = child._iter1(offset * fixed_size, length * fixed_size) + + if self._contains_nulls(): + validity = view.buffer(0).elements(offset, length) + for is_valid in validity: + item = list(islice(child_iter, fixed_size)) + yield item if is_valid else None + else: + for _ in range(length): + yield list(islice(child_iter, fixed_size)) + + def _string_iter(self, offset, length): + view = self._array_view + offset += view.offset + offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)] + starts = offsets[:-1] + ends = offsets[1:] + data = memoryview(view.buffer(2)) + + if self._contains_nulls(): + validity = view.buffer(0).elements(offset, length) + for is_valid, start, end in zip(validity, starts, ends): + yield str(data[start:end], "UTF-8") if is_valid else None + else: + for start, end in zip(starts, ends): + yield str(data[start:end], "UTF-8") + + def _binary_iter(self, offset, length): + view = self._array_view + offset += view.offset + offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)] + starts = offsets[:-1] + ends = offsets[1:] + data = memoryview(view.buffer(2)) + + if self._contains_nulls(): + validity = view.buffer(0).elements(offset, length) + for is_valid, start, end in zip(validity, starts, ends): + yield bytes(data[start:end]) if is_valid else None + else: + for start, end in zip(starts, ends): + yield bytes(data[start:end]) + + def _primitive_iter(self, offset, length): + view = self._array_view + offset += view.offset + items = view.buffer(1).elements(offset, length) + + if self._contains_nulls(): + validity = view.buffer(0).elements(offset, length) + return self._wrap_iter_nullable(validity, items) + else: + return iter(items) + + +class RowTupleIterator(RowIterator): + """Iterate over rows of a struct array (stream) where each row is a + tuple instead of a dictionary. This is ~3x faster and matches other + Python concepts more closely (e.g., dbapi's cursor, pandas itertuples). + Intended for internal use. + """ + + def __init__(self, schema, *, _array_view=None): + super().__init__(schema, _array_view=_array_view) + if self._schema_view.type != "struct": + raise TypeError( + "RowTupleIterator can only iterate over struct arrays " + f"(got '{self._schema_view.type}')" + ) + + def _make_child(self, schema, array_view): + return RowIterator(schema, _array_view=array_view) + + def _iter1(self, offset, length): + return self._struct_tuple_iter(offset, length) + + +_ITEMS_ITER_LOOKUP = { + CArrowType.BINARY: "_binary_iter", + CArrowType.LARGE_BINARY: "_binary_iter", + CArrowType.STRING: "_string_iter", + CArrowType.LARGE_STRING: "_string_iter", + CArrowType.STRUCT: "_struct_iter", + CArrowType.LIST: "_list_iter", + CArrowType.LARGE_LIST: "_list_iter", + CArrowType.FIXED_SIZE_LIST: "_fixed_size_list_iter", + CArrowType.DICTIONARY: "_dictionary_iter", +} + +_PRIMITIVE_TYPE_NAMES = [ + "BOOL", + "UINT8", + "INT8", + "UINT16", + "INT16", + "UINT32", + "INT32", + "UINT64", + "INT64", + "HALF_FLOAT", + "FLOAT", + "DOUBLE", + "FIXED_SIZE_BINARY", + "INTERVAL_MONTHS", + "INTERVAL_DAY_TIME", + "INTERVAL_MONTH_DAY_NANO", + "DECIMAL128", + "DECIMAL256", +] + +for type_name in _PRIMITIVE_TYPE_NAMES: + type_id = getattr(CArrowType, type_name) + _ITEMS_ITER_LOOKUP[type_id] = "_primitive_iter" diff --git a/python/tests/test_c_array.py b/python/tests/test_c_array.py index f966cb912..046c2de2e 100644 --- a/python/tests/test_c_array.py +++ b/python/tests/test_c_array.py @@ -103,6 +103,46 @@ def test_c_array_type_not_supported(): na.c_array(None) +def test_c_array_slice(): + array = na.c_array([1, 2, 3], na.int32()) + assert array.offset == 0 + assert array.length == 3 + + array2 = array[:] + assert array.offset == 0 + assert array.length == 3 + assert array.buffers == array2.buffers + + array2 = array[:2] + assert array2.offset == 0 + assert array2.length == 2 + + array2 = array[:-1] + assert array2.offset == 0 + assert array2.length == 2 + + array2 = array[1:] + assert array2.offset == 1 + assert array2.length == 2 + + array2 = array[-2:] + assert array2.offset == 1 + assert array2.length == 2 + + +def test_c_array_slice_errors(): + array = na.c_array([1, 2, 3], na.int32()) + + with pytest.raises(TypeError): + array[0] + with pytest.raises(IndexError): + array[4:] + with pytest.raises(IndexError): + array[:4] + with pytest.raises(IndexError): + array[1:0] + + def test_c_array_from_pybuffer_uint8(): data = b"abcdefg" c_array = na.c_array(data) diff --git a/python/tests/test_c_array_stream.py b/python/tests/test_c_array_stream.py new file mode 100644 index 000000000..fc6925d82 --- /dev/null +++ b/python/tests/test_c_array_stream.py @@ -0,0 +1,155 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from nanoarrow._lib import NanoarrowException +from nanoarrow.c_lib import CArrayStream + +import nanoarrow as na + + +def test_c_array_stream_from_c_array_stream(): + # Wrapping an existing stream is a no-op + array_stream = CArrayStream.from_array_list([], na.c_schema(na.int32())) + stream_from_stream = na.c_array_stream(array_stream) + assert stream_from_stream is array_stream + + # With requested_schema should go through capsule + array_stream = CArrayStream.from_array_list([], na.c_schema(na.int32())) + with pytest.raises(NotImplementedError): + na.c_array_stream(array_stream, na.int64()) + + +def test_c_array_stream_from_capsule_protocol(): + # Use wrapper object to ensure this is the path taken in the constructor + class CArrayStreamWrapper: + def __init__(self, obj): + self.obj = obj + + def __arrow_c_stream__(self, *args, **kwargs): + return self.obj.__arrow_c_stream__(*args, **kwargs) + + array_stream = CArrayStream.from_array_list([], na.c_schema(na.int32())) + array_stream_wrapper = CArrayStreamWrapper(array_stream) + from_protocol = na.c_array_stream(array_stream_wrapper) + assert array_stream.is_valid() is False + assert from_protocol.get_schema().format == "i" + + +def test_c_array_stream_from_old_pyarrow(): + # Simulate a pyarrow RecordBatchReader with no __arrow_c_stream__ + class MockLegacyPyarrowRecordBatchReader: + def __init__(self, obj): + self.obj = obj + + def _export_to_c(self, *args): + return self.obj._export_to_c(*args) + + MockLegacyPyarrowRecordBatchReader.__module__ = "pyarrow.lib" + + pa = pytest.importorskip("pyarrow") + reader = pa.RecordBatchReader.from_batches(pa.schema([]), []) + mock_reader = MockLegacyPyarrowRecordBatchReader(reader) + + array_stream = na.c_array_stream(mock_reader) + assert array_stream.get_schema().format == "+s" + + +def test_c_array_stream_from_bare_capsule(): + array_stream = CArrayStream.from_array_list([], na.c_schema(na.int32())) + + # Check from bare capsule without supplying a schema + capsule = array_stream.__arrow_c_stream__() + from_capsule = na.c_array_stream(capsule) + assert from_capsule.get_schema().format == "i" + + array_stream = CArrayStream.from_array_list([], na.c_schema(na.int32())) + capsule = array_stream.__arrow_c_stream__() + + with pytest.raises(TypeError, match="Can't import c_array_stream"): + na.c_array_stream(capsule, na.int32()) + + +def test_c_array_stream_from_c_array_fallback(): + # Check that arrays are valid input + c_array = na.c_array([1, 2, 3], na.int32()) + array_stream = na.c_array_stream(c_array) + assert array_stream.get_schema().format == "i" + arrays = list(array_stream) + assert len(arrays) == 1 + assert arrays[0].buffers == c_array.buffers + + # Check fallback with schema + array_stream = na.c_array_stream([1, 2, 3], na.int32()) + assert array_stream.get_schema().format == "i" + arrays = list(array_stream) + assert len(arrays) == 1 + + +def test_c_array_stream_error(): + msg = "Can't convert object of type NoneType" + with pytest.raises(TypeError, match=msg): + na.c_array_stream(None) + + +def test_array_stream_from_arrays_schema(): + schema_in = na.c_schema(na.int32()) + + stream = CArrayStream.from_array_list([], schema_in) + assert schema_in.is_valid() + assert list(stream) == [] + assert stream.get_schema().format == "i" + + # Check move of schema + CArrayStream.from_array_list([], schema_in, move=True) + assert schema_in.is_valid() is False + assert stream.get_schema().format == "i" + + +def test_array_stream_from_arrays(): + schema_in = na.c_schema(na.int32()) + array_in = na.c_array([1, 2, 3], schema_in) + array_in_buffers = array_in.buffers + + stream = CArrayStream.from_array_list([array_in], schema_in) + assert array_in.is_valid() + arrays = list(stream) + assert len(arrays) == 1 + assert arrays[0].buffers == array_in_buffers + + # Check move of array + stream = CArrayStream.from_array_list([array_in], schema_in, move=True) + assert array_in.is_valid() is False + arrays = list(stream) + assert len(arrays) == 1 + assert arrays[0].buffers == array_in_buffers + + +def test_array_stream_from_arrays_validate(): + schema_in = na.c_schema(na.null()) + array_in = na.c_array([1, 2, 3], na.int32()) + + # Check that we can skip validation and proceed without error + stream = CArrayStream.from_array_list([array_in], schema_in, validate=False) + arrays = list(stream) + assert len(arrays) == 1 + assert arrays[0].n_buffers == 2 + + # ...but that validation does happen by default + msg = "Expected array with 0 buffer" + with pytest.raises(NanoarrowException, match=msg): + CArrayStream.from_array_list([array_in], schema_in) diff --git a/python/tests/test_c_buffer.py b/python/tests/test_c_buffer.py index 0487a382a..b3105e1da 100644 --- a/python/tests/test_c_buffer.py +++ b/python/tests/test_c_buffer.py @@ -119,7 +119,7 @@ def test_c_buffer_integer(): assert buffer[1] == 1 assert buffer[2] == 2 assert list(buffer) == [0, 1, 2] - assert list(buffer.elements) == [0, 1, 2] + assert list(buffer.elements()) == [0, 1, 2] assert buffer.n_elements == len(buffer) assert [buffer.element(i) for i in range(buffer.n_elements)] == list(buffer) @@ -146,7 +146,7 @@ def test_numpy_c_buffer_numeric(): array = np.array([0, 1, 2], dtype) buffer = na.c_buffer(array) assert list(buffer) == list(array) - assert list(buffer.elements) == list(array) + assert list(buffer.elements()) == list(array) array_roundtrip = np.array(buffer, copy=False) np.testing.assert_array_equal(array_roundtrip, array) @@ -294,7 +294,7 @@ def test_c_buffer_bitmap_from_iterable(): assert buffer.data_type == "bool" assert buffer.item_size == 1 assert buffer.element_size_bits == 1 - assert list(buffer.elements) == [ + assert list(buffer.elements()) == [ True, False, False, @@ -305,20 +305,20 @@ def test_c_buffer_bitmap_from_iterable(): False, ] assert [buffer.element(i) for i in range(buffer.n_elements)] == list( - buffer.elements + buffer.elements() ) # Check something exactly one byte buffer = na.c_buffer([True, False, False, True] * 2, na.bool()) assert "10011001" in repr(buffer) assert buffer.size_bytes == 1 - assert list(buffer.elements) == [True, False, False, True] * 2 + assert list(buffer.elements()) == [True, False, False, True] * 2 # Check something more than one byte buffer = na.c_buffer([True, False, False, True] * 3, na.bool()) assert "1001100110010000" in repr(buffer) assert buffer.size_bytes == 2 - assert list(buffer.elements) == [True, False, False, True] * 3 + [ + assert list(buffer.elements()) == [True, False, False, True] * 3 + [ False, False, False, diff --git a/python/tests/test_c_buffer_view.py b/python/tests/test_c_buffer_view.py new file mode 100644 index 000000000..79ac2a1be --- /dev/null +++ b/python/tests/test_c_buffer_view.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from nanoarrow.c_lib import c_array_view + +import nanoarrow as na + + +def test_buffer_view_bool(): + bool_array_view = c_array_view([1, 0, 0, 1], na.bool()) + view = bool_array_view.buffer(1) + + assert view.element_size_bits == 1 + assert view.size_bytes == 1 + assert view.data_type_id == na.Type.BOOL.value + assert view.data_type == "bool" + assert view.format == "B" + + # Check item interface + assert len(view) == 1 + assert view[0] == 0b1001 + assert list(view) == [0b1001] + + # Check against buffer protocol + mv = memoryview(view) + assert len(mv) == len(view) + assert mv[0] == view[0] + assert list(mv) == list(view) + + # Check element interface + assert view.n_elements == 8 + assert list(view.elements()) == [True, False, False, True] + [False] * 4 + assert [view.element(i) for i in range(8)] == list(view.elements()) + + # Check element slices + assert list(view.elements(0, 4)) == [True, False, False, True] + assert list(view.elements(1, 3)) == [False, False, True] + + msg = "do not describe a valid slice" + with pytest.raises(IndexError, match=msg): + view.elements(-1, None) + with pytest.raises(IndexError, match=msg): + view.elements(0, -1) + with pytest.raises(IndexError, match=msg): + view.elements(0, 9) + + # Check repr + assert "10010000" in repr(view) + + +def test_buffer_view_non_bool(): + array_view = c_array_view([1, 2, 3, 5], na.int32()) + view = array_view.buffer(1) + + assert view.element_size_bits == 32 + assert view.size_bytes == 4 * 4 + assert view.data_type_id == na.Type.INT32.value + assert view.data_type == "int32" + assert view.format == "i" + + # Check item interface + assert len(view) == 4 + assert list(view) == [1, 2, 3, 5] + assert [view[i] for i in range(4)] == list(view) + + # Check against buffer protocol + mv = memoryview(view) + assert len(mv) == len(view) + assert mv[0] == view[0] + assert [mv[i] for i in range(4)] == [view[i] for i in range(4)] + + # Check element interface + assert view.n_elements == len(view) + assert list(view.elements()) == list(view) + assert [view.element(i) for i in range(4)] == list(view.elements()) + + # Check element slices + assert list(view.elements(0, 3)) == [1, 2, 3] + assert list(view.elements(1, 3)) == [2, 3, 5] + + with pytest.raises(IndexError, match="do not describe a valid slice"): + view.elements(-1, None) + with pytest.raises(IndexError, match="do not describe a valid slice"): + view.elements(0, -1) + with pytest.raises(IndexError, match="do not describe a valid slice"): + view.elements(1, 4) + + # Check repr + assert "1 2 3 5" in repr(view) diff --git a/python/tests/test_c_schema_view.py b/python/tests/test_c_schema_view.py new file mode 100644 index 000000000..968415a24 --- /dev/null +++ b/python/tests/test_c_schema_view.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from nanoarrow.c_lib import c_schema_view + +import nanoarrow as na + + +def test_schema_view_accessors_basic(): + view = c_schema_view(na.Type.DATE32) + assert view.type == "date32" + assert view.type_id == na.Type.DATE32.value + assert view.storage_type == "int32" + assert view.storage_type_id == na.Type.INT32.value + assert view.nullable is True + assert view.map_keys_sorted is None + assert view.fixed_size is None + assert view.decimal_bitwidth is None + assert view.decimal_precision is None + assert view.decimal_scale is None + assert view.time_unit_id is None + assert view.time_unit is None + assert view.timezone is None + assert view.extension_name is None + assert view.extension_metadata is None + + assert "date32" in repr(view) + assert "fixed_size" not in repr(view) + + +def test_schema_view_accessors_fixed_size(): + view = c_schema_view(na.fixed_size_binary(123)) + assert view.fixed_size == 123 + + +def test_schema_view_accessors_datetime(): + view = c_schema_view(na.timestamp("s", "America/Halifax")) + assert view.timezone == "America/Halifax" + assert view.time_unit_id == na.TimeUnit.SECOND.value + assert view.time_unit == "s" + + +def test_schema_view_accessors_decimal(): + view = c_schema_view(na.decimal128(10, 3)) + assert view.decimal_bitwidth == 128 + assert view.decimal_precision == 10 + assert view.decimal_scale == 3 + + +def test_schema_view_accessors_non_nullable(): + view = c_schema_view(na.int32(nullable=False)) + assert view.nullable is False + + +def test_schema_view_layout_accessors(): + view = c_schema_view(na.Type.INT32) + assert view.layout.n_buffers == 2 + assert view.layout.buffer_data_type_id[0] == na.Type.BOOL.value + assert view.layout.element_size_bits[0] == 1 + assert view.layout.buffer_data_type_id[1] == na.Type.INT32.value + assert view.layout.element_size_bits[1] == 32 + assert view.layout.child_size_elements == 0 diff --git a/python/tests/test_iterator.py b/python/tests/test_iterator.py new file mode 100644 index 000000000..95d0218ad --- /dev/null +++ b/python/tests/test_iterator.py @@ -0,0 +1,315 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from nanoarrow.iterator import iterator, itertuples + +import nanoarrow as na + + +def test_iterator_primitive(): + array = na.c_array([1, 2, 3], na.int32()) + assert list(iterator(array)) == [1, 2, 3] + + sliced = array[1:] + assert list(iterator(sliced)) == [2, 3] + + +def test_iterator_nullable_primitive(): + array = na.c_array_from_buffers( + na.int32(), + 4, + buffers=[ + na.c_buffer([1, 1, 1, 0], na.bool()), + na.c_buffer([1, 2, 3, 0], na.int32()), + ], + ) + assert list(iterator(array)) == [1, 2, 3, None] + + sliced = array[1:] + assert list(iterator(sliced)) == [2, 3, None] + + +def test_iterator_string(): + array = na.c_array_from_buffers( + na.string(), 2, buffers=[None, na.c_buffer([0, 2, 5], na.int32()), b"abcde"] + ) + + assert list(iterator(array)) == ["ab", "cde"] + + sliced = array[1:] + assert list(iterator(sliced)) == ["cde"] + + +def test_iterator_nullable_string(): + array = na.c_array_from_buffers( + na.string(), + 3, + buffers=[ + na.c_buffer([1, 1, 0], na.bool()), + na.c_buffer([0, 2, 5, 5], na.int32()), + b"abcde", + ], + ) + + assert list(iterator(array)) == ["ab", "cde", None] + + sliced = array[1:] + assert list(iterator(sliced)) == ["cde", None] + + +def test_iterator_binary(): + array = na.c_array_from_buffers( + na.binary(), 2, buffers=[None, na.c_buffer([0, 2, 5], na.int32()), b"abcde"] + ) + + assert list(iterator(array)) == [b"ab", b"cde"] + + sliced = array[1:] + assert list(iterator(sliced)) == [b"cde"] + + +def test_iterator_nullable_binary(): + array = na.c_array_from_buffers( + na.binary(), + 3, + buffers=[ + na.c_buffer([1, 1, 0], na.bool()), + na.c_buffer([0, 2, 5, 5], na.int32()), + b"abcde", + ], + ) + + assert list(iterator(array)) == [b"ab", b"cde", None] + + sliced = array[1:] + assert list(iterator(sliced)) == [b"cde", None] + + +def test_itertuples(): + array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.bool()}), + length=3, + buffers=[None], + children=[na.c_array([1, 2, 3], na.int32()), na.c_array([1, 0, 1], na.bool())], + ) + + assert list(itertuples(array)) == [(1, True), (2, False), (3, True)] + + sliced = array[1:] + assert list(itertuples(sliced)) == [(2, False), (3, True)] + + sliced_child = na.c_array_from_buffers( + array.schema, + length=2, + buffers=[None], + children=[array.child(0)[1:], array.child(1)[1:]], + ) + assert list(itertuples(sliced_child)) == [(2, False), (3, True)] + + nested_sliced = sliced_child[1:] + assert list(itertuples(nested_sliced)) == [(3, True)] + + +def test_itertuples_nullable(): + array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.bool()}), + length=4, + buffers=[na.c_buffer([True, True, True, False], na.bool())], + children=[ + na.c_array([1, 2, 3, 4], na.int32()), + na.c_array([1, 0, 1, 0], na.bool()), + ], + ) + + assert list(itertuples(array)) == [(1, True), (2, False), (3, True), None] + + sliced = array[1:] + assert list(itertuples(sliced)) == [(2, False), (3, True), None] + + sliced_child = na.c_array_from_buffers( + array.schema, + length=3, + buffers=[na.c_buffer([True, True, False], na.bool())], + children=[array.child(0)[1:], array.child(1)[1:]], + ) + assert list(itertuples(sliced_child)) == [(2, False), (3, True), None] + + nested_sliced = sliced_child[1:] + assert list(itertuples(nested_sliced)) == [(3, True), None] + + +def test_itertuples_errors(): + with pytest.raises(TypeError, match="can only iterate over struct arrays"): + list(itertuples(na.c_array([1, 2, 3], na.int32()))) + + +def test_iterator_struct(): + array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.bool()}), + length=3, + buffers=[None], + children=[na.c_array([1, 2, 3], na.int32()), na.c_array([1, 0, 1], na.bool())], + ) + + assert list(iterator(array)) == [ + {"col1": 1, "col2": True}, + {"col1": 2, "col2": False}, + {"col1": 3, "col2": True}, + ] + + sliced = array[1:] + assert list(iterator(sliced)) == [ + {"col1": 2, "col2": False}, + {"col1": 3, "col2": True}, + ] + + +def test_iterator_nullable_struct(): + array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.bool()}), + length=4, + buffers=[na.c_buffer([True, True, True, False], na.bool())], + children=[ + na.c_array([1, 2, 3, 4], na.int32()), + na.c_array([1, 0, 1, 0], na.bool()), + ], + ) + + assert list(iterator(array)) == [ + {"col1": 1, "col2": True}, + {"col1": 2, "col2": False}, + {"col1": 3, "col2": True}, + None, + ] + + sliced = array[1:] + assert list(iterator(sliced)) == [ + {"col1": 2, "col2": False}, + {"col1": 3, "col2": True}, + None, + ] + + +def test_iterator_list(): + pa = pytest.importorskip("pyarrow") + items = [[1, 2, 3], [4, 5, 6], [7, 8, None], [0]] + array = pa.array(items) + assert list(iterator(array)) == items + + sliced = array[1:] + assert list(iterator(sliced)) == [[4, 5, 6], [7, 8, None], [0]] + + array_sliced_child = pa.ListArray.from_arrays([0, 2, 5, 8, 9], array.values[1:]) + assert (list(iterator(array_sliced_child))) == [ + [2, 3], + [4, 5, 6], + [7, 8, None], + [0], + ] + + nested_sliced = array_sliced_child[1:] + assert (list(iterator(nested_sliced))) == [ + [4, 5, 6], + [7, 8, None], + [0], + ] + + +def test_iterator_nullable_list(): + pa = pytest.importorskip("pyarrow") + items = [[1, 2, 3], [4, 5, 6], [7, 8, None], [0], None] + array = pa.array(items) + assert list(iterator(array)) == items + + sliced = array[1:] + assert list(iterator(sliced)) == [[4, 5, 6], [7, 8, None], [0], None] + + array_sliced_child = pa.ListArray.from_arrays( + [0, 2, 5, 8, 9, 9], + array.values[1:], + mask=pa.array([False, False, False, False, True]), + ) + assert (list(iterator(array_sliced_child))) == [ + [2, 3], + [4, 5, 6], + [7, 8, None], + [0], + None, + ] + + nested_sliced = array_sliced_child[1:] + assert (list(iterator(nested_sliced))) == [[4, 5, 6], [7, 8, None], [0], None] + + +def test_iterator_fixed_size_list(): + pa = pytest.importorskip("pyarrow") + items = [[1, 2, 3], [4, 5, 6], [7, 8, None]] + array = pa.array(items, pa.list_(pa.int64(), 3)) + assert list(iterator(array)) == items + + sliced = array[1:] + assert list(iterator(sliced)) == [[4, 5, 6], [7, 8, None]] + + array_sliced_child = pa.FixedSizeListArray.from_arrays(array.values[3:], 3) + assert (list(iterator(array_sliced_child))) == [[4, 5, 6], [7, 8, None]] + + nested_sliced = array_sliced_child[1:] + assert (list(iterator(nested_sliced))) == [[7, 8, None]] + + +def test_iterator_nullable_fixed_size_list(): + pa = pytest.importorskip("pyarrow") + items = [[1, 2, 3], [4, 5, 6], [7, 8, None], None] + array = pa.array(items, pa.list_(pa.int64(), 3)) + assert list(iterator(array)) == items + + sliced = array[1:] + assert list(iterator(sliced)) == [[4, 5, 6], [7, 8, None], None] + + # mask argument only available for pyarrow >= 15.0.0 + array_sliced_child = pa.FixedSizeListArray.from_arrays( + array.values[3:], 3, mask=pa.array([False, False, True]) + ) + assert (list(iterator(array_sliced_child))) == [[4, 5, 6], [7, 8, None], None] + + nested_sliced = array_sliced_child[1:] + assert (list(iterator(nested_sliced))) == [[7, 8, None], None] + + +def test_iterator_dictionary(): + pa = pytest.importorskip("pyarrow") + + items = ["ab", "cde", "ab", "def", "cde"] + array = pa.array(items).dictionary_encode() + + assert list(iterator(array)) == items + + sliced = array[1:] + assert list(iterator(sliced)) == ["cde", "ab", "def", "cde"] + + +def test_iterator_nullable_dictionary(): + pa = pytest.importorskip("pyarrow") + + items = ["ab", "cde", "ab", "def", "cde", None] + array = pa.array(items).dictionary_encode() + + assert list(iterator(array)) == items + + sliced = array[1:] + assert list(iterator(sliced)) == ["cde", "ab", "def", "cde", None] diff --git a/python/tests/test_nanoarrow.py b/python/tests/test_nanoarrow.py index ea3d95ecc..689905eb3 100644 --- a/python/tests/test_nanoarrow.py +++ b/python/tests/test_nanoarrow.py @@ -371,15 +371,19 @@ def test_buffers_bool(): # Check via element interface assert data_buffer.n_elements == 8 - assert list(data_buffer.elements) == [True] * 3 + [False] * 5 + assert list(data_buffer.elements()) == [True] * 3 + [False] * 5 assert [data_buffer.element(i) for i in range(data_buffer.n_elements)] == list( - data_buffer.elements + data_buffer.elements() ) with pytest.raises(IndexError): data_buffer[8] with pytest.raises(IndexError): data_buffer[-1] + with pytest.raises(IndexError): + next(data_buffer.elements(-1, 4)) + with pytest.raises(IndexError): + next(data_buffer.elements(7, 2)) # Check repr assert "11100000" in repr(data_buffer)