Skip to content

Commit

Permalink
feat(python): Add CArrayView -> Python conversion (#391)
Browse files Browse the repository at this point in the history
This PR adds a framework for Python object creation from arrays and
array streams with implementations for most arrow types. Notably, it
includes implementations for nested types (struct, list, dictionary) to
make sure that the framework won't have to be completely rewritten to
accommodate them. A few types (decimal, datetime) aren't supported but
should be reasonably easy to implement by wrapping existing iterator
factories included in this PR.

None of these are exposed with `import nanoarrow as na` yet...I'm
anticipating that the user-facing `nanoarrow.Array` and/or
`nanoarrow.ArrayStream` to use the implementation here in methods.

A few changes were required at a lower level to make this work:

- It is now possible to use nanoarrow's `ArrowBasicArrayStream`
implementation to create a stream from a previously-resolved list of
arrays. This makes it easier to test streams since before we had no way
to create them.
- The constructor for `c_array_stream()` now falls back on `c_array()`
by wrapping it in a length-one stream. This makes it easier to write
generic code that takes stream-like input (like the iterator).
- The `ArrowLayout` needed to be exposed to implement the fixed-size
list implementation.
- I added tests for all the lower level changes, which I did in
dedicated files. Some of these tests overlap with existing tests in
test_nanoarrow...at some point we should go through test_nanoarrow and
separate the tests (or create an integration test section since many of
those early tests assumed pyarrow was available).

The implementation seems to be efficient given the constraint that
assembling the iterators is currently done using Python code.

```python
import numpy as np
import pyarrow as pa
from nanoarrow import iterator

n = int(1e6)
n_cols = 10
arrays = [np.random.random(n) for _ in range(n_cols)]
batch = pa.record_batch(
    arrays,
    names=[f"col{i}" for i in range(n_cols)]
)

%timeit list(iterator.itertuples(batch))
#> 256 ms ± 4.61 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Just zipping the arrays
%timeit list(zip(*arrays))
#> 335 ms ± 1.15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# A few ways to do this from pyarrow
%timeit list(zip(*(col.to_pylist() for col in batch.columns)))
#> 1.99 s ± 52.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit list(zip(*(col.to_numpy() for col in batch.columns)))
#> 315 ms ± 1.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# Works if all columns are the same type (but rows are arrays, not tuples)
%timeit list(np.array(batch))
#> 131 ms ± 484 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

# Test some nested things
n = int(1e4)
n_cols = 10
big_list = [["a", "b", "c", "d", "e"]] * n

arrays = [big_list for _ in range(n_cols)]
batch = pa.record_batch(
    arrays,
    names=[f"col{i}" for i in range(n_cols)]
)

%timeit list(iterator.itertuples(batch))
#> 89.2 ms ± 756 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit list(zip(*(col.to_pylist() for col in batch.columns)))
#> 288 ms ± 748 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
  • Loading branch information
paleolimbot authored Mar 1, 2024
1 parent 7e601cc commit 7cf50a3
Show file tree
Hide file tree
Showing 11 changed files with 1,231 additions and 45 deletions.
4 changes: 4 additions & 0 deletions python/src/nanoarrow/_ipc_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ cdef class PyInputStreamPrivate:
self.size_bytes = 0
self.close_stream = close_stream

# Needed for at least some implementations of readinto()
def __len__(self):
return self.size_bytes

# Implement the buffer protocol so that this object can be used as
# the argument to xxx.readinto(). This ensures that no extra copies
# (beyond any buffering done by the upstream file-like object) are held
Expand Down
212 changes: 181 additions & 31 deletions python/src/nanoarrow/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ cdef class CSchemaView:
self._nullable = schema._ptr.flags & ARROW_FLAG_NULLABLE
self._map_keys_sorted = schema._ptr.flags & ARROW_FLAG_MAP_KEYS_SORTED

@property
def layout(self):
return CLayout(self, <uintptr_t>&self._schema_view.layout)

@property
def type_id(self):
return self._schema_view.type
Expand All @@ -788,15 +792,17 @@ cdef class CSchemaView:

@property
def dictionary_ordered(self):
return self._dictionary_ordered != 0
if self._schema_view.type == NANOARROW_TYPE_DICTIONARY:
return self._dictionary_ordered != 0

@property
def nullable(self):
return self._nullable != 0

@property
def map_keys_sorted(self):
return self._map_keys_sorted != 0
if self._schema_view.type == NANOARROW_TYPE_MAP:
return self._map_keys_sorted != 0

@property
def fixed_size(self):
Expand Down Expand Up @@ -861,6 +867,38 @@ cdef class CSchemaView:
return _repr_utils.schema_view_repr(self)


cdef class CLayout:
cdef ArrowLayout* _layout
cdef object _base
cdef int _n_buffers

def __cinit__(self, base, uintptr_t ptr):
self._base = base
self._layout = <ArrowLayout*>ptr

self._n_buffers = NANOARROW_MAX_FIXED_BUFFERS
for i in range(NANOARROW_MAX_FIXED_BUFFERS):
if self._layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE:
self._n_buffers = i
break

@property
def n_buffers(self):
return self._n_buffers

@property
def buffer_data_type_id(self):
return tuple(self._layout.buffer_data_type[i] for i in range(self._n_buffers))

@property
def element_size_bits(self):
return tuple(self._layout.element_size_bits[i] for i in range(self._n_buffers))

@property
def child_size_elements(self):
return self._layout.child_size_elements


cdef class CSchemaBuilder:
cdef CSchema c_schema
cdef ArrowSchema* _ptr
Expand Down Expand Up @@ -1027,6 +1065,36 @@ cdef class CArray:

return out

def __getitem__(self, k):
if not isinstance(k, slice):
raise TypeError(
f"Can't slice CArray with object of type {type(k).__name__}"
)

if k.step is not None:
raise ValueError("Can't slice CArray with step")

cdef int64_t start = 0 if k.start is None else k.start
cdef int64_t stop = self._ptr.length if k.stop is None else k.stop
if start < 0:
start = self.length + start
if stop < 0:
stop = self.length + stop

if start > self._ptr.length or stop > self._ptr.length or stop < start:
raise IndexError(
f"{k} does not describe a valid slice of CArray "
f"with length {self._ptr.length}"
)

cdef ArrowArray* c_array_out
base = alloc_c_array(&c_array_out)
c_array_shallow_copy(self._base, self._ptr, c_array_out)

c_array_out.offset = c_array_out.offset + start
c_array_out.length = stop - start
return CArray(base, <uintptr_t>c_array_out, self._schema)

def __arrow_c_array__(self, requested_schema=None):
"""
Get a pair of PyCapsules containing a C ArrowArray representation of the object.
Expand Down Expand Up @@ -1134,6 +1202,7 @@ cdef class CArrayView:
See `nanoarrow.c_array_view()` for construction and usage examples.
"""
cdef object _base
cdef object _array_base
cdef ArrowArrayView* _ptr
cdef CDevice _device

Expand All @@ -1142,12 +1211,34 @@ cdef class CArrayView:
self._ptr = <ArrowArrayView*>addr
self._device = CDEVICE_CPU

def _set_array(self, CArray array, CDevice device=CDEVICE_CPU):
cdef Error error = Error()
cdef int code

if device is CDEVICE_CPU:
code = ArrowArrayViewSetArray(self._ptr, array._ptr, &error.c_error)
else:
code = ArrowArrayViewSetArrayMinimal(self._ptr, array._ptr, &error.c_error)

error.raise_message_not_ok("ArrowArrayViewSetArray()", code)
self._array_base = array._base
self._device = device
return self

@property
def storage_type_id(self):
return self._ptr.storage_type

@property
def storage_type(self):
cdef const char* type_str = ArrowTypeString(self._ptr.storage_type)
if type_str != NULL:
return type_str.decode('UTF-8')

@property
def layout(self):
return CLayout(self, <uintptr_t>&self._ptr.layout)

@property
def length(self):
return self._ptr.length
Expand Down Expand Up @@ -1183,10 +1274,7 @@ cdef class CArrayView:

@property
def n_buffers(self):
for i in range(3):
if self._ptr.layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE:
return i
return 3
return self.layout.n_buffers

def buffer_type(self, int64_t i):
if i < 0 or i >= self.n_buffers:
Expand Down Expand Up @@ -1220,7 +1308,7 @@ cdef class CArrayView:
raise RuntimeError(f"ArrowArrayView buffer {i} has size_bytes < 0")

return CBufferView(
self._base,
self._array_base,
<uintptr_t>buffer_view.data.data,
buffer_view.size_bytes,
self._ptr.layout.buffer_data_type[i],
Expand All @@ -1247,19 +1335,21 @@ cdef class CArrayView:
return _repr_utils.array_view_repr(self)

@staticmethod
def from_cpu_array(CArray array):
def from_schema(CSchema schema):
cdef ArrowArrayView* c_array_view
base = alloc_c_array_view(&c_array_view)

cdef Error error = Error()
cdef int code = ArrowArrayViewInitFromSchema(c_array_view,
array._schema._ptr, &error.c_error)
schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayViewInitFromSchema()", code)

code = ArrowArrayViewSetArray(c_array_view, array._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayViewSetArray()", code)
return CArrayView(base, <uintptr_t>c_array_view)

return CArrayView((base, array), <uintptr_t>c_array_view)
@staticmethod
def from_array(CArray array, CDevice device=CDEVICE_CPU):
out = CArrayView.from_schema(array._schema)
return out._set_array(array, device)


cdef class SchemaMetadata:
Expand Down Expand Up @@ -1373,6 +1463,14 @@ cdef class CBufferView:
return value

def __iter__(self):
return self._iter_dispatch(0, len(self))

def _iter_dispatch(self, int64_t offset, int64_t length):
if offset < 0 or length < 0 or (offset + length) > len(self):
raise IndexError(
f"offset {offset} and length {length} do not describe a valid slice "
f"of buffer with length {len(self)}"
)
# memoryview's implementation is very fast but not always possible (half float, fixed-size binary, interval)
if self._data_type in (
NANOARROW_TYPE_HALF_FLOAT,
Expand All @@ -1383,11 +1481,14 @@ cdef class CBufferView:
) or (
self._data_type == NANOARROW_TYPE_BINARY and self._element_size_bits != 0
):
return self._iter_struct()
return self._iter_struct(offset, length)
else:
return iter(memoryview(self))
return self._iter_memoryview(offset, length)

def _iter_struct(self):
def _iter_memoryview(self, int64_t offset, int64_t length):
return iter(memoryview(self)[offset:(offset + length)])

def _iter_struct(self, int64_t offset, int64_t length):
for value in iter_unpack(self.format, self):
if len(value) == 1:
yield value[0]
Expand All @@ -1409,19 +1510,41 @@ cdef class CBufferView:
else:
return self[i]

@property
def elements(self):
def elements(self, offset=0, length=None):
if length is None:
length = self.n_elements

if offset < 0 or length < 0 or (offset + length) > self.n_elements:
raise IndexError(
f"offset {offset} and length {length} do not describe a valid slice "
f"of buffer with {self.n_elements} elements"
)

if self._data_type == NANOARROW_TYPE_BOOL:
return self._iter_bitmap()
return self._iter_bitmap(offset, length)
else:
return self.__iter__()
return self._iter_dispatch(offset, length)

def _iter_bitmap(self):
def _iter_bitmap(self, int64_t offset, int64_t length):
cdef uint8_t item
for i in range(self._shape):
item = self._ptr.data.as_uint8[i]
for j in range(8):
yield (item & (<uint8_t>1 << j)) != 0
cdef int64_t i

if offset % 8 == 0:
first_byte = offset // 8
last_byte = self._shape
i = 0

for byte_i in range(first_byte, last_byte):
item = self._ptr.data.as_uint8[byte_i]
for j in range(8):
yield (item & (<uint8_t>1 << j)) != 0
i += 1
if i >= length:
return
else:
for i in range(length):
yield ArrowBitGet(self._ptr.data.as_uint8, offset + i) != 0


cdef Py_ssize_t _item_size(self):
if self._element_size_bits < 8:
Expand Down Expand Up @@ -1611,10 +1734,9 @@ cdef class CBuffer:
self._assert_valid()
return self._view.element(i)

@property
def elements(self):
def elements(self, offset=0, length=None):
self._assert_valid()
return self._view.elements
return self._view.elements(offset, length)

def __getbuffer__(self, Py_buffer* buffer, int flags):
self._assert_valid()
Expand Down Expand Up @@ -1921,16 +2043,44 @@ cdef class CArrayStream:
cdef ArrowArrayStream* _ptr
cdef object _cached_schema

def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayStream*>addr
self._cached_schema = None

@staticmethod
def allocate():
cdef ArrowArrayStream* c_array_stream_out
base = alloc_c_array_stream(&c_array_stream_out)
return CArrayStream(base, <uintptr_t>c_array_stream_out)

def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayStream*>addr
self._cached_schema = None
@staticmethod
def from_array_list(arrays, CSchema schema, move=False, validate=True):
cdef ArrowArrayStream* c_array_stream_out
base = alloc_c_array_stream(&c_array_stream_out)

if not move:
schema = schema.__deepcopy__()

cdef int code = ArrowBasicArrayStreamInit(c_array_stream_out, schema._ptr, len(arrays))
Error.raise_error_not_ok("ArrowBasicArrayStreamInit()", code)

cdef ArrowArray tmp
cdef CArray array
for i in range(len(arrays)):
array = arrays[i]
if not move:
c_array_shallow_copy(array._base, array._ptr, &tmp)
ArrowBasicArrayStreamSetArray(c_array_stream_out, i, &tmp)
else:
ArrowBasicArrayStreamSetArray(c_array_stream_out, i, array._ptr)

cdef Error error = Error()
if validate:
code = ArrowBasicArrayStreamValidate(c_array_stream_out, &error.c_error)
error.raise_message_not_ok("ArrowBasicArrayStreamValidate()", code)

return CArrayStream(base, <uintptr_t>c_array_stream_out)

def release(self):
if self.is_valid():
Expand Down
Loading

0 comments on commit 7cf50a3

Please sign in to comment.