forked from zarr-developers/zarr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
206 lines (157 loc) · 6.27 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from __future__ import annotations
import asyncio
import functools
import operator
import warnings
from collections.abc import Iterable, Mapping
from enum import Enum
from itertools import starmap
from typing import (
TYPE_CHECKING,
Any,
Literal,
TypeVar,
cast,
overload,
)
import numpy as np
from zarr.core.config import config as zarr_config
from zarr.core.strings import _STRING_DTYPE
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterator
ZARR_JSON = "zarr.json"
ZARRAY_JSON = ".zarray"
ZGROUP_JSON = ".zgroup"
ZATTRS_JSON = ".zattrs"
ZMETADATA_V2_JSON = ".zmetadata"
BytesLike = bytes | bytearray | memoryview
ShapeLike = tuple[int, ...] | int
ChunkCoords = tuple[int, ...]
ChunkCoordsLike = Iterable[int]
ZarrFormat = Literal[2, 3]
NodeType = Literal["array", "group"]
JSON = str | int | float | Mapping[str, "JSON"] | tuple["JSON", ...] | None
MemoryOrder = Literal["C", "F"]
AccessModeLiteral = Literal["r", "r+", "a", "w", "w-"]
def product(tup: ChunkCoords) -> int:
return functools.reduce(operator.mul, tup, 1)
T = TypeVar("T", bound=tuple[Any, ...])
V = TypeVar("V")
async def concurrent_map(
items: Iterable[T],
func: Callable[..., Awaitable[V]],
limit: int | None = None,
) -> list[V]:
if limit is None:
return await asyncio.gather(*list(starmap(func, items)))
else:
sem = asyncio.Semaphore(limit)
async def run(item: tuple[Any]) -> V:
async with sem:
return await func(*item)
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
E = TypeVar("E", bound=Enum)
def enum_names(enum: type[E]) -> Iterator[str]:
for item in enum:
yield item.name
def parse_enum(data: object, cls: type[E]) -> E:
if isinstance(data, cls):
return data
if not isinstance(data, str):
raise TypeError(f"Expected str, got {type(data)}")
if data in enum_names(cls):
return cls(data)
raise ValueError(f"Value must be one of {list(enum_names(cls))!r}. Got {data} instead.")
def parse_name(data: JSON, expected: str | None = None) -> str:
if isinstance(data, str):
if expected is None or data == expected:
return data
raise ValueError(f"Expected '{expected}'. Got {data} instead.")
else:
raise TypeError(f"Expected a string, got an instance of {type(data)}.")
def parse_configuration(data: JSON) -> JSON:
if not isinstance(data, dict):
raise TypeError(f"Expected dict, got {type(data)}")
return data
@overload
def parse_named_configuration(
data: JSON, expected_name: str | None = None
) -> tuple[str, dict[str, JSON]]: ...
@overload
def parse_named_configuration(
data: JSON, expected_name: str | None = None, *, require_configuration: bool = True
) -> tuple[str, dict[str, JSON] | None]: ...
def parse_named_configuration(
data: JSON, expected_name: str | None = None, *, require_configuration: bool = True
) -> tuple[str, JSON | None]:
if not isinstance(data, dict):
raise TypeError(f"Expected dict, got {type(data)}")
if "name" not in data:
raise ValueError(f"Named configuration does not have a 'name' key. Got {data}.")
name_parsed = parse_name(data["name"], expected_name)
if "configuration" in data:
configuration_parsed = parse_configuration(data["configuration"])
elif require_configuration:
raise ValueError(f"Named configuration does not have a 'configuration' key. Got {data}.")
else:
configuration_parsed = None
return name_parsed, configuration_parsed
def parse_shapelike(data: int | Iterable[int]) -> tuple[int, ...]:
if isinstance(data, int):
if data < 0:
raise ValueError(f"Expected a non-negative integer. Got {data} instead")
return (data,)
try:
data_tuple = tuple(data)
except TypeError as e:
msg = f"Expected an integer or an iterable of integers. Got {data} instead."
raise TypeError(msg) from e
if not all(isinstance(v, int) for v in data_tuple):
msg = f"Expected an iterable of integers. Got {data} instead."
raise TypeError(msg)
if not all(v > -1 for v in data_tuple):
msg = f"Expected all values to be non-negative. Got {data} instead."
raise ValueError(msg)
return data_tuple
def parse_fill_value(fill_value: Any, dtype: Any, zarr_format: ZarrFormat) -> Any:
if zarr_format == 2 and (dtype is str or dtype == "str") and fill_value == 0:
fill_value = ""
# todo: real validation
return fill_value
def parse_order(data: Any) -> Literal["C", "F"]:
if data in ("C", "F"):
return cast(Literal["C", "F"], data)
raise ValueError(f"Expected one of ('C', 'F'), got {data} instead.")
def parse_bool(data: Any) -> bool:
if isinstance(data, bool):
return data
raise ValueError(f"Expected bool, got {data} instead.")
def parse_dtype(dtype: Any, zarr_format: ZarrFormat) -> np.dtype[Any]:
if dtype is str or dtype == "str":
if zarr_format == 2:
# special case as object
return np.dtype("object")
else:
return _STRING_DTYPE
return np.dtype(dtype)
def _warn_write_empty_chunks_kwarg() -> None:
# TODO: link to docs page on array configuration in this message
msg = (
"The `write_empty_chunks` keyword argument is deprecated and will be removed in future versions. "
"To control whether empty chunks are written to storage, either use the `config` keyword "
"argument, as in `config={'write_empty_chunks: True}`,"
"or change the global 'array.write_empty_chunks' configuration variable."
)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
def _warn_order_kwarg() -> None:
# TODO: link to docs page on array configuration in this message
msg = (
"The `order` keyword argument has no effect for Zarr format 3 arrays. "
"To control the memory layout of the array, either use the `config` keyword "
"argument, as in `config={'order: 'C'}`,"
"or change the global 'array.order' configuration variable."
)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
def _default_zarr_format() -> ZarrFormat:
"""Return the default zarr_version"""
return cast(ZarrFormat, int(zarr_config.get("default_zarr_format", 3)))