forked from fsspec/filesystem_spec
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreference.py
207 lines (184 loc) · 7.63 KB
/
reference.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import base64
import itertools
import json
from ..asyn import AsyncFileSystem
from ..core import filesystem, open
class ReferenceFileSystem(AsyncFileSystem):
"""View byte ranges of some other file as a file system
Initial version: single file system target, which must support
async, and must allow start and end args in _cat_file. Later versions
may allow multiple arbitrary URLs for the targets.
This FileSystem is read-only. It is designed to be used with async
targets (for now). This FileSystem only allows whole-file access, no
``open``. We do not get original file details from the target FS.
Configuration is by passing a dict of references at init, or a URL to
a JSON file containing the same; this dict
can also contain concrete data for some set of paths.
Reference dict format:
{path0: bytes_data, path1: (target_url, offset, size)}
https://github.com/intake/fsspec-reference-maker/blob/main/README.md
"""
protocol = "reference"
def __init__(
self,
references,
target=None,
ref_storage_args=None,
target_protocol=None,
target_options=None,
fs=None,
**kwargs,
):
"""
Parameters
----------
references : dict or str
The set of references to use for this instance, with a structure as above.
If str, will use fsspec.open, in conjunction with ref_storage_args to
open and parse JSON at this location.
target : str
For any references having target_url as None, this is the default file
target to use
ref_storage_args : dict
If references is a str, use these kwargs for loading the JSON file
target_protocol : str
If fs is None, instantiate a file system using this protocol
target_options : dict
If fs is None, instantiate a filesystem using these kwargs
fs : file system instance
Directly provide a file system, if you want to configure it beforehand. This
takes precedence over target_protocol/target_options
kwargs : passed to parent class
"""
if fs is not None:
if not fs.async_impl:
raise NotImplementedError("Only works with async targets")
kwargs["loop"] = fs.loop
super().__init__(**kwargs)
if fs is None:
fs = filesystem(target_protocol, loop=self.loop, **(target_options or {}))
if not fs.async_impl:
raise NotImplementedError("Only works with async targets")
if isinstance(references, str):
with open(references, "rb", **(ref_storage_args or {})) as f:
text = f.read()
else:
text = references
self.target = target
self._process_references(text)
self.fs = fs
async def _cat_file(self, path):
path = self._strip_protocol(path)
part = self.references[path]
if isinstance(part, bytes):
return part
elif isinstance(part, str):
return part.encode()
url, start, size = part
end = start + size
if url is None:
url = self.target
return await self.fs._cat_file(url, start=start, end=end)
def _process_references(self, references):
if isinstance(references, bytes):
references = json.loads(references.decode())
vers = references.get("version", None)
if vers is None:
self._process_references0(references)
elif vers == 1:
self._process_references1(references)
else:
raise ValueError(f"Unknown reference spec version: {vers}")
# TODO: we make dircache by iteraring over all entries, but for Spec >= 1,
# can replace with programmatic. Is it even needed for mapper interface?
self._dircache_from_items()
def _process_references0(self, references):
"""Make reference dict for Spec Version 0"""
if "zarr_consolidated_format" in references:
# special case for Ike prototype
references = _unmodel_hdf5(references)
self.references = references
def _process_references1(self, references):
try:
import jinja2
except ImportError as e:
raise ValueError("Reference Spec Version 1 requires jinja2") from e
self.references = {}
templates = {}
for k, v in references.get("templates", {}).items():
if "{{" in v:
templates[k] = lambda temp=v, **kwargs: jinja2.Template(temp).render(
**kwargs
)
else:
templates[k] = v
for k, v in references["refs"].items():
if isinstance(v, str):
if v.startswith("base64:"):
self.references[k] = base64.b64decode(v[7:])
self.references[k] = v
else:
u, off, l = v
if "{{" in u:
u = jinja2.Template(u).render(**templates)
self.references[k] = [u, off, l]
for gen in references.get("gen", []):
dimension = {
k: v
if isinstance(v, list)
else range(v.get("start", 0), v["stop"], v.get("step", 1))
for k, v in gen["dimensions"].items()
}
products = (
dict(zip(dimension.keys(), values))
for values in itertools.product(*dimension.values())
)
for pr in products:
key = jinja2.Template(gen["key"]).render(**pr, **templates)
url = jinja2.Template(gen["url"]).render(**pr, **templates)
offset = int(jinja2.Template(gen["offset"]).render(**pr, **templates))
length = int(jinja2.Template(gen["length"]).render(**pr, **templates))
self.references[key] = [url, offset, length]
def _dircache_from_items(self):
self.dircache = {"": []}
for path, part in self.references.items():
if isinstance(part, (bytes, str)):
size = len(part)
else:
_, start, end = part
size = end - start
par = self._parent(path)
par0 = par
while par0:
# build parent directories
if par0 not in self.dircache:
self.dircache[par0] = []
self.dircache.setdefault(self._parent(par0), []).append(
{"name": par0, "type": "directory", "size": 0}
)
par0 = self._parent(par0)
self.dircache[par].append({"name": path, "type": "file", "size": size})
async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
out = self._ls_from_cache(path)
if detail:
return out
return [o["name"] for o in out]
def _unmodel_hdf5(references):
"""Special JSON format from HDF5"""
# see https://gist.github.com/ajelenak/80354a95b449cedea5cca508004f97a9
import re
ref = {}
for key, value in references["metadata"].items():
if key.endswith(".zchunkstore"):
source = value.pop("source")["uri"]
match = re.findall(r"https://([^.]+)\.s3\.amazonaws\.com", source)
if match:
source = source.replace(
f"https://{match[0]}.s3.amazonaws.com", match[0]
)
for k, v in value.items():
ref[k] = (source, v["offset"], v["offset"] + v["size"])
else:
ref[key] = json.dumps(value).encode()
return ref