-
-
Notifications
You must be signed in to change notification settings - Fork 323
/
Copy pathcpu.py
227 lines (175 loc) · 6.97 KB
/
cpu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
Literal,
)
import numpy as np
import numpy.typing as npt
from zarr.core.buffer import core
from zarr.registry import (
register_buffer,
register_ndbuffer,
)
if TYPE_CHECKING:
from collections.abc import Callable, Iterable
from typing import Self
from zarr.core.buffer.core import ArrayLike, NDArrayLike
from zarr.core.common import BytesLike
class Buffer(core.Buffer):
"""A flat contiguous memory block
We use Buffer throughout Zarr to represent a contiguous block of memory.
A Buffer is backed by a underlying array-like instance that represents
the memory. The memory type is unspecified; can be regular host memory,
CUDA device memory, or something else. The only requirement is that the
array-like instance can be copied/converted to a regular Numpy array
(host memory).
Notes
-----
This buffer is untyped, so all indexing and sizes are in bytes.
Parameters
----------
array_like
array-like object that must be 1-dim, contiguous, and byte dtype.
"""
def __init__(self, array_like: ArrayLike) -> None:
super().__init__(array_like)
@classmethod
def create_zero_length(cls) -> Self:
return cls(np.array([], dtype="b"))
@classmethod
def from_buffer(cls, buffer: core.Buffer) -> Self:
"""Create a new buffer of an existing Buffer
This is useful if you want to ensure that an existing buffer is
of the correct subclass of Buffer. E.g., MemoryStore uses this
to return a buffer instance of the subclass specified by its
BufferPrototype argument.
Typically, this only copies data if the data has to be moved between
memory types, such as from host to device memory.
Parameters
----------
buffer
buffer object.
Returns
-------
A new buffer representing the content of the input buffer
Notes
-----
Subclasses of `Buffer` must override this method to implement
more optimal conversions that avoid copies where possible
"""
return cls.from_array_like(buffer.as_numpy_array())
@classmethod
def from_bytes(cls, bytes_like: BytesLike) -> Self:
"""Create a new buffer of a bytes-like object (host memory)
Parameters
----------
bytes_like
bytes-like object
Returns
-------
New buffer representing `bytes_like`
"""
return cls.from_array_like(np.frombuffer(bytes_like, dtype="b"))
def as_numpy_array(self) -> npt.NDArray[Any]:
"""Returns the buffer as a NumPy array (host memory).
Notes
-----
Might have to copy data, consider using `.as_array_like()` instead.
Returns
-------
NumPy array of this buffer (might be a data copy)
"""
return np.asanyarray(self._data)
def __add__(self, other: core.Buffer) -> Self:
"""Concatenate two buffers"""
other_array = other.as_array_like()
assert other_array.dtype == np.dtype("b")
return self.__class__(
np.concatenate((np.asanyarray(self._data), np.asanyarray(other_array)))
)
class NDBuffer(core.NDBuffer):
"""An n-dimensional memory block
We use NDBuffer throughout Zarr to represent a n-dimensional memory block.
A NDBuffer is backed by a underlying ndarray-like instance that represents
the memory. The memory type is unspecified; can be regular host memory,
CUDA device memory, or something else. The only requirement is that the
ndarray-like instance can be copied/converted to a regular Numpy array
(host memory).
Notes
-----
The two buffer classes Buffer and NDBuffer are very similar. In fact, Buffer
is a special case of NDBuffer where dim=1, stride=1, and dtype="b". However,
in order to use Python's type system to differentiate between the contiguous
Buffer and the n-dim (non-contiguous) NDBuffer, we keep the definition of the
two classes separate.
Parameters
----------
array
ndarray-like object that is convertible to a regular Numpy array.
"""
def __init__(self, array: NDArrayLike) -> None:
super().__init__(array)
@classmethod
def create(
cls,
*,
shape: Iterable[int],
dtype: npt.DTypeLike,
order: Literal["C", "F"] = "C",
fill_value: Any | None = None,
) -> Self:
if fill_value is None:
return cls(np.zeros(shape=tuple(shape), dtype=dtype, order=order))
else:
return cls(np.full(shape=tuple(shape), fill_value=fill_value, dtype=dtype, order=order))
@classmethod
def from_numpy_array(cls, array_like: npt.ArrayLike) -> Self:
return cls.from_ndarray_like(np.asanyarray(array_like))
def as_numpy_array(self) -> npt.NDArray[Any]:
"""Returns the buffer as a NumPy array (host memory).
Warnings
--------
Might have to copy data, consider using `.as_ndarray_like()` instead.
Returns
-------
NumPy array of this buffer (might be a data copy)
"""
return np.asanyarray(self._data)
def __getitem__(self, key: Any) -> Self:
return self.__class__(np.asanyarray(self._data.__getitem__(key)))
def __setitem__(self, key: Any, value: Any) -> None:
if isinstance(value, NDBuffer):
value = value._data
self._data.__setitem__(key, value)
def as_numpy_array_wrapper(
func: Callable[[npt.NDArray[Any]], bytes], buf: core.Buffer, prototype: core.BufferPrototype
) -> core.Buffer:
"""Converts the input of `func` to a numpy array and the output back to `Buffer`.
This function is useful when calling a `func` that only support host memory such
as `GZip.decode` and `Blosc.decode`. In this case, use this wrapper to convert
the input `buf` to a Numpy array and convert the result back into a `Buffer`.
Parameters
----------
func
The callable that will be called with the converted `buf` as input.
`func` must return bytes, which will be converted into a `Buffer`
before returned.
buf
The buffer that will be converted to a Numpy array before given as
input to `func`.
prototype
The prototype of the output buffer.
Returns
-------
The result of `func` converted to a `Buffer`
"""
return prototype.buffer.from_bytes(func(buf.as_numpy_array()))
# CPU buffer prototype using numpy arrays
buffer_prototype = core.BufferPrototype(buffer=Buffer, nd_buffer=NDBuffer)
# default_buffer_prototype = buffer_prototype
# The numpy prototype used for E.g. when reading the shard index
def numpy_buffer_prototype() -> core.BufferPrototype:
return core.BufferPrototype(buffer=Buffer, nd_buffer=NDBuffer)
register_buffer(Buffer, qualname="zarr.buffer.cpu.Buffer")
register_ndbuffer(NDBuffer, qualname="zarr.buffer.cpu.NDBuffer")