Skip to content

Commit a0ff949

Browse files
committed
Support numba expr for ndarray with chunks=blocks=shape
1 parent d5c0993 commit a0ff949

File tree

2 files changed

+111
-69
lines changed

2 files changed

+111
-69
lines changed

blosc2/blosc2_ext.pyx

+94-48
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ cdef extern from "b2nd.h":
484484
int b2nd_copy(b2nd_context_t *ctx, b2nd_array_t *src, b2nd_array_t **array)
485485
int b2nd_from_schunk(blosc2_schunk *schunk, b2nd_array_t **array)
486486

487+
void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index)
487488

488489

489490
ctypedef struct user_filters_udata:
@@ -498,6 +499,14 @@ ctypedef struct filler_udata:
498499
int output_cdtype
499500
int32_t chunkshape
500501

502+
ctypedef struct numba_udata:
503+
char* py_func
504+
uintptr_t inputs_id
505+
int output_cdtype
506+
int32_t chunkshape
507+
int ndim
508+
int32_t *blockshape
509+
int64_t *shape
501510

502511
MAX_TYPESIZE = BLOSC_MAX_TYPESIZE
503512
MAX_BUFFERSIZE = BLOSC2_MAX_BUFFERSIZE
@@ -1454,35 +1463,6 @@ cdef class SChunk:
14541463
raise RuntimeError("Could not create compression context")
14551464

14561465

1457-
def _set_aux_numba(self, func, inputs_id, dtype_output):
1458-
if self.schunk.storage.cparams.nthreads > 1:
1459-
raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter")
1460-
1461-
func_id = func.__name__
1462-
blosc2.prefilter_funcs[func_id] = func
1463-
func_id = func_id.encode("utf-8") if isinstance(func_id, str) else func_id
1464-
1465-
# Set prefilter
1466-
cdef blosc2_cparams* cparams = self.schunk.storage.cparams
1467-
cparams.prefilter = <blosc2_prefilter_fn> general_numba
1468-
1469-
cdef blosc2_prefilter_params* preparams = <blosc2_prefilter_params *> malloc(sizeof(blosc2_prefilter_params))
1470-
cdef filler_udata* fill_udata = <filler_udata *> malloc(sizeof(filler_udata))
1471-
fill_udata.py_func = <char *> malloc(strlen(func_id) + 1)
1472-
strcpy(fill_udata.py_func, func_id)
1473-
fill_udata.inputs_id = inputs_id
1474-
fill_udata.output_cdtype = np.dtype(dtype_output).num
1475-
fill_udata.chunkshape = self.schunk.chunksize // self.schunk.typesize
1476-
1477-
preparams.user_data = fill_udata
1478-
cparams.preparams = preparams
1479-
_check_cparams(cparams)
1480-
1481-
blosc2_free_ctx(self.schunk.cctx)
1482-
self.schunk.cctx = blosc2_create_cctx(dereference(cparams))
1483-
if self.schunk.cctx == NULL:
1484-
raise RuntimeError("Could not create compression context")
1485-
14861466
def _set_prefilter(self, func, dtype_input, dtype_output=None):
14871467
if self.schunk.storage.cparams.nthreads > 1:
14881468
raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter")
@@ -1576,30 +1556,63 @@ cdef int general_filler(blosc2_prefilter_params *params):
15761556

15771557

15781558
cdef int general_numba(blosc2_prefilter_params *params):
1579-
cdef filler_udata *udata = <filler_udata *> params.user_data
1580-
cdef int nd = 1
1581-
cdef np.npy_intp dims = params.output_size // params.output_typesize
1559+
cdef numba_udata *udata = <numba_udata *> params.user_data
1560+
cdef int nd = udata.ndim
1561+
# off = ??? segurament també caldrà canviar-ho
1562+
cdef int64_t offset = params.nchunk * udata.chunkshape + params.output_offset // params.output_typesize
1563+
print("out_offset: ", params.output_offset, " offset calculat ", offset)
1564+
#shape normal, no extshape, revisar-ho quan hi haja padding
1565+
cdef int64_t offset_ndim[B2ND_MAX_DIM]
1566+
blosc2_unidim_to_multidim(nd, udata.shape, offset, offset_ndim)
1567+
cdef np.npy_intp dims[B2ND_MAX_DIM]
1568+
dims = udata.blockshape # Açò canviarà quan hi haja padding
1569+
# params.output_size // params.output_typesize
1570+
1571+
#output = np.PyArray_SimpleNewFromData(nd, dims, udata.output_cdtype, <void*>params.output)
15821572

15831573
inputs_tuple = _ctypes.PyObj_FromPtr(udata.inputs_id)
1584-
1585-
output = np.PyArray_SimpleNewFromData(nd, &dims, udata.output_cdtype, <void*>params.output)
1586-
offset = params.nchunk * udata.chunkshape + params.output_offset // params.output_typesize
1587-
15881574
inputs = []
1589-
for obj, dtype in inputs_tuple:
1590-
if isinstance(obj, blosc2.SChunk):
1591-
out = np.empty(dims, dtype=dtype)
1592-
obj.get_slice(start=offset, stop=offset + dims, out=out)
1593-
inputs.append(out)
1594-
elif isinstance(obj, np.ndarray):
1595-
inputs.append(obj[offset : offset + dims])
1596-
elif isinstance(obj, (int, float, bool, complex)):
1597-
inputs.append(np.full(dims, obj, dtype=dtype))
1598-
else:
1599-
raise ValueError("Unsupported operand")
1575+
blockshape = [udata.blockshape[i] for i in range(nd)]
1576+
1577+
if nd == 1:
1578+
# Enviar-ho a fer la mà quan nd = 1 ? o ho puc suportar sense problemes??
1579+
for obj, dtype in inputs_tuple:
1580+
if isinstance(obj, blosc2.SChunk):
1581+
out = np.empty(udata.blockshape[0], dtype=dtype)
1582+
obj.get_slice(start=offset, stop=offset + dims[0], out=out)
1583+
inputs.append(out)
1584+
elif isinstance(obj, np.ndarray):
1585+
inputs.append(obj[offset : offset + dims[0]])
1586+
elif isinstance(obj, (int, float, bool, complex)):
1587+
inputs.append(np.full(udata.blockshape[0], obj, dtype=dtype))
1588+
else:
1589+
raise ValueError("Unsupported operand")
1590+
else:
1591+
# Get tuple of slices
1592+
l = []
1593+
for i in range(nd):
1594+
l.append(slice(offset_ndim[i], offset_ndim[i] + udata.blockshape[i]))
1595+
slices = tuple(l)
1596+
for obj, dtype in inputs_tuple:
1597+
if isinstance(obj, blosc2.SChunk):
1598+
raise ValueError("Cannot mix unidim operands with multidim") # o sí?
1599+
elif isinstance(obj, np.ndarray):
1600+
inputs.append(obj[slices])
1601+
elif isinstance(obj, np.ndarray):
1602+
inputs.append(obj[slices])
1603+
elif isinstance(obj, (int, float, bool, complex)):
1604+
inputs.append(np.full(blockshape, obj, dtype=dtype))
1605+
else:
1606+
raise ValueError("Unsupported operand")
16001607

16011608
func_id = udata.py_func.decode("utf-8")
1602-
blosc2.prefilter_funcs[func_id](tuple(inputs), output, offset)
1609+
out = np.empty(blockshape, dtype)
1610+
blosc2.prefilter_funcs[func_id](tuple(inputs), out, offset)
1611+
1612+
cdef Py_buffer *buf = <Py_buffer *> malloc(sizeof(Py_buffer))
1613+
PyObject_GetBuffer(out, buf, PyBUF_SIMPLE)
1614+
memcpy(params.output, buf.buf, buf.len)
1615+
PyBuffer_Release(buf)
16031616

16041617
return 0
16051618

@@ -2226,6 +2239,39 @@ cdef class NDArray:
22262239
if self.array.shape[0] == 1 and self.ndim == 1:
22272240
self.array.ndim = 0
22282241

2242+
2243+
def _set_aux_numba(self, func, inputs_id):
2244+
if self.array.sc.storage.cparams.nthreads > 1:
2245+
raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter")
2246+
2247+
func_id = func.__name__
2248+
blosc2.prefilter_funcs[func_id] = func
2249+
func_id = func_id.encode("utf-8") if isinstance(func_id, str) else func_id
2250+
2251+
# Set prefilter
2252+
cdef blosc2_cparams* cparams = self.array.sc.storage.cparams
2253+
cparams.prefilter = <blosc2_prefilter_fn> general_numba
2254+
2255+
cdef blosc2_prefilter_params* preparams = <blosc2_prefilter_params *> malloc(sizeof(blosc2_prefilter_params))
2256+
cdef numba_udata* pref_udata = <numba_udata *> malloc(sizeof(numba_udata))
2257+
pref_udata.py_func = <char *> malloc(strlen(func_id) + 1)
2258+
strcpy(pref_udata.py_func, func_id)
2259+
pref_udata.inputs_id = inputs_id
2260+
pref_udata.output_cdtype = np.dtype(self.dtype).num
2261+
pref_udata.chunkshape = self.array.sc.chunksize // self.array.sc.typesize
2262+
pref_udata.ndim = self.ndim
2263+
pref_udata.blockshape = self.array.blockshape
2264+
pref_udata.shape = self.array.shape
2265+
2266+
preparams.user_data = pref_udata
2267+
cparams.preparams = preparams
2268+
_check_cparams(cparams)
2269+
2270+
blosc2_free_ctx(self.array.sc.cctx)
2271+
self.array.sc.cctx = blosc2_create_cctx(dereference(cparams))
2272+
if self.array.sc.cctx == NULL:
2273+
raise RuntimeError("Could not create compression context")
2274+
22292275
def __dealloc__(self):
22302276
if self.array != NULL:
22312277
_check_rc(b2nd_free(self.array), "Error while freeing the array")

blosc2/lazyexpr.py

+17-21
Original file line numberDiff line numberDiff line change
@@ -510,33 +510,29 @@ def do_slices_intersect(slice1, slice2):
510510

511511

512512
class NumbaExpr:
513-
def __init__(self, func, inputs_tuple, schunk_dtype):
514-
# Suposem que tots els operands tenen els mateix shape (ara per ara) i que són schunks, ja o
515-
# canviarem més endavant
513+
def __init__(self, func, inputs_tuple, dtype, shape):
514+
# Suposem que tots els operands tenen els mateix shape (ara per ara)
516515
self.inputs_tuple = inputs_tuple # Keep reference to evict lost reference
517-
op1 = inputs_tuple[0][0]
518-
cparams = {'typesize': np.dtype(schunk_dtype).itemsize, 'nthreads': 1}
519-
self.nbytes = op1.size * cparams['typesize']
520-
self.res = blosc2.SChunk(chunksize=self.nbytes, cparams=cparams)
521-
self.res._set_aux_numba(func, id(inputs_tuple), schunk_dtype)
522-
self.schunk_dtype = schunk_dtype # Quan siga amb ndarray açò ja no caldrà
516+
if shape is None:
517+
for obj, dtype in inputs_tuple:
518+
if isinstance(obj, (np.ndarray, blosc2.NDArray)):
519+
# Get res shape
520+
self.shape = obj.shape
521+
break
522+
cparams = {'nthreads': 1}
523+
# canviar això de nthreads
524+
self.res = blosc2.empty(self.shape, dtype, cparams=cparams, chunks=self.shape, blocks=self.shape)
525+
self.res._set_aux_numba(func, id(inputs_tuple))
523526
self.func = func
524527

525528
def eval(self):
529+
aux = np.zeros(self.res.shape, self.res.dtype)
530+
self.res[...] = aux
531+
self.res.schunk.remove_prefilter(self.func.__name__)
526532

527-
chunksize = self.res.chunksize
528-
written_nbytes = 0
529-
while written_nbytes < self.nbytes:
530-
chunk = np.zeros(chunksize // self.res.typesize, dtype=self.schunk_dtype)
531-
self.res.append_data(chunk)
532-
written_nbytes += chunksize
533-
if (self.nbytes - written_nbytes) < self.res.chunksize:
534-
chunksize = self.nbytes - written_nbytes
535-
536-
self.res.remove_prefilter(self.func.__name__)
537533
return self.res
538534

539535

540536
# inputs_tuple = ( (operand, dtype), (operand2, dtype2), ... )
541-
def expr_from_udf(func, inputs_tuple, dtype):
542-
return NumbaExpr(func, inputs_tuple, dtype)
537+
def expr_from_udf(func, inputs_tuple, dtype, shape=None):
538+
return NumbaExpr(func, inputs_tuple, dtype, shape)

0 commit comments

Comments
 (0)