-
-
Notifications
You must be signed in to change notification settings - Fork 327
/
Copy pathbytes.py
122 lines (95 loc) · 3.74 KB
/
bytes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from __future__ import annotations
import sys
from dataclasses import dataclass, replace
from enum import Enum
from typing import TYPE_CHECKING
import numpy as np
from zarr.abc.codec import ArrayBytesCodec
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.common import JSON, parse_enum, parse_named_configuration
from zarr.registry import register_codec
if TYPE_CHECKING:
from typing import Self
from zarr.core.array_spec import ArraySpec
class Endian(Enum):
"""
Enum for endian type used by bytes codec.
"""
big = "big"
little = "little"
default_system_endian = Endian(sys.byteorder)
@dataclass(frozen=True)
class BytesCodec(ArrayBytesCodec):
is_fixed_size = True
endian: Endian
def __init__(self, *, endian: Endian | str = default_system_endian) -> None:
endian_parsed = parse_enum(endian, Endian)
object.__setattr__(self, "endian", endian_parsed)
@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
_, configuration_parsed = parse_named_configuration(
data, "bytes", require_configuration=True
)
return cls(**configuration_parsed) # type: ignore[arg-type]
def to_dict(self) -> dict[str, JSON]:
return {"name": "bytes", "configuration": {"endian": self.endian.value}}
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
if array_spec.dtype.itemsize == 0:
if self.endian is not None:
return replace(self, endian=None)
elif self.endian is None:
raise ValueError(
"The `endian` configuration needs to be specified for multi-byte data types."
)
return self
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
assert isinstance(chunk_bytes, Buffer)
if chunk_spec.dtype.itemsize > 0:
if self.endian == Endian.little:
prefix = "<"
else:
prefix = ">"
dtype = np.dtype(f"{prefix}{chunk_spec.dtype.str[1:]}")
else:
dtype = np.dtype(f"|{chunk_spec.dtype.str[1:]}")
as_array_like = chunk_bytes.as_array_like()
if isinstance(as_array_like, NDArrayLike):
as_nd_array_like = as_array_like
else:
as_nd_array_like = np.asanyarray(as_array_like)
chunk_array = chunk_spec.prototype.nd_buffer.from_ndarray_like(
as_nd_array_like.view(dtype=dtype)
)
# ensure correct chunk shape
if chunk_array.shape != chunk_spec.shape:
chunk_array = chunk_array.reshape(
chunk_spec.shape,
)
return chunk_array
async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
assert isinstance(chunk_array, NDBuffer)
if (
chunk_array.dtype.itemsize > 1
and self.endian != chunk_array.byteorder
):
# type-ignore is a numpy bug
# see https://github.com/numpy/numpy/issues/26473
new_dtype = chunk_array.dtype.newbyteorder(self.endian.name) # type: ignore[arg-type]
chunk_array = chunk_array.astype(new_dtype)
nd_array = chunk_array.as_ndarray_like()
# Flatten the nd-array (only copy if needed) and reinterpret as bytes
nd_array = nd_array.ravel().view(dtype="b")
return chunk_spec.prototype.buffer.from_array_like(nd_array)
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length
register_codec("bytes", BytesCodec)
# compatibility with earlier versions of ZEP1
register_codec("endian", BytesCodec)