Skip to content

Commit 497948b

Browse files
committed
Add chunked/non-chunked storage managers
1 parent 97823a3 commit 497948b

File tree

6 files changed

+388
-170
lines changed

6 files changed

+388
-170
lines changed

bossphorus/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@
1818

1919

2020
def main():
21-
"""Entry point for Bossphorus"""
21+
"""Entry point for Bossphorus."""
2222
app = bossphorus.create_app()
2323
app.run(host="0.0.0.0", port=5000)
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
"""
2+
Copyright 2018 The Johns Hopkins University Applied Physics Laboratory.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
from typing import Tuple, List
17+
from abc import ABC, abstractmethod
18+
19+
import os
20+
import numpy as np
21+
22+
from .StorageManager import StorageManager
23+
from .utils import file_compute, blockfile_indices
24+
25+
26+
class ChunkedFileInterface(ABC):
27+
"""
28+
A filesystem manager that handles transit from numpy in-memory to a
29+
static format on disk.
30+
"""
31+
32+
format_name = "None"
33+
34+
@abstractmethod
35+
def store(
36+
self,
37+
data: np.array,
38+
col: str,
39+
exp: str,
40+
chan: str,
41+
res: int,
42+
b: Tuple[int, int, int],
43+
):
44+
...
45+
46+
@abstractmethod
47+
def retrieve(
48+
self, col: str, exp: str, chan: str, res: int, b: Tuple[int, int, int]
49+
):
50+
...
51+
52+
53+
class NpyChunkedFileInterface(ChunkedFileInterface):
54+
def __init__(self, storage_path: str, block_size):
55+
self.storage_path = storage_path
56+
self.block_size = block_size
57+
self.format_name = "NPY"
58+
59+
def __repr__(self):
60+
return f"<NpyChunkedFileInterface>"
61+
62+
def store(
63+
self,
64+
data: np.array,
65+
col: str,
66+
exp: str,
67+
chan: str,
68+
res: int,
69+
b: Tuple[int, int, int],
70+
):
71+
"""
72+
Store a single block file.
73+
74+
Arguments:
75+
data (np.array)
76+
bossURI
77+
78+
"""
79+
os.makedirs(
80+
"{}/{}/{}/{}/".format(self.storage_path, col, exp, chan), exist_ok=True
81+
)
82+
fname = "{}/{}/{}/{}/{}-{}-{}-{}.npy".format(
83+
self.storage_path,
84+
col,
85+
exp,
86+
chan,
87+
res,
88+
(b[0], b[0] + self.block_size[0]),
89+
(b[1], b[1] + self.block_size[1]),
90+
(b[2], b[2] + self.block_size[2]),
91+
)
92+
return np.save(fname, data)
93+
94+
def retrieve(
95+
self, col: str, exp: str, chan: str, res: int, b: Tuple[int, int, int]
96+
):
97+
"""
98+
Pull a single block from disk.
99+
100+
Arguments:
101+
bossURI
102+
103+
"""
104+
if not (
105+
os.path.isdir("{}/{}".format(self.storage_path, col))
106+
and os.path.isdir("{}/{}/{}".format(self.storage_path, col, exp))
107+
and os.path.isdir("{}/{}/{}/{}".format(self.storage_path, col, exp, chan))
108+
):
109+
raise IOError("{}/{}/{} not found.".format(col, exp, chan))
110+
fname = "{}/{}/{}/{}/{}-{}-{}-{}.npy".format(
111+
self.storage_path,
112+
col,
113+
exp,
114+
chan,
115+
res,
116+
(b[0], b[0] + self.block_size[0]),
117+
(b[1], b[1] + self.block_size[1]),
118+
(b[2], b[2] + self.block_size[2]),
119+
)
120+
return np.load(fname)
121+
122+
123+
class ChunkedFilesystemStorageManager(StorageManager):
124+
"""
125+
File System management for volumetric data.
126+
127+
Contains logic for reading and writing to local filesystem.
128+
"""
129+
130+
def __init__(
131+
self, storage_path: str, block_size: Tuple[int, int, int], **kwargs
132+
) -> None:
133+
"""
134+
Create a new ChunkedFileSystemStorageManager.
135+
136+
Arguments:
137+
storage_path: Where to store the data tree
138+
block_size: How much data should go in each file
139+
preferred_format (str: npy): file format you prefer to use on disk
140+
"""
141+
self.name = "ChunkedFilesystemStorageManager"
142+
if "next_layer" in kwargs:
143+
self._next = kwargs["next_layer"]
144+
self.is_terminal = False
145+
else:
146+
self.is_terminal = True
147+
self.storage_path = storage_path
148+
self.block_size = block_size
149+
150+
self.fs = (
151+
{"npy": NpyChunkedFileInterface}.get(
152+
kwargs.get("preferred_format", "npy")
153+
)
154+
)(self.storage_path, self.block_size)
155+
156+
def hasdata(
157+
self,
158+
col: str,
159+
exp: str,
160+
chan: str,
161+
res: int,
162+
xs: Tuple[int, int],
163+
ys: Tuple[int, int],
164+
zs: Tuple[int, int],
165+
):
166+
# TODO: Should know when it has data and return false even if it's
167+
# in terminal mode
168+
return self.is_terminal
169+
170+
def setdata(
171+
self,
172+
data: np.array,
173+
col: str,
174+
exp: str,
175+
chan: str,
176+
res: int,
177+
xs: Tuple[int, int],
178+
ys: Tuple[int, int],
179+
zs: Tuple[int, int],
180+
):
181+
"""
182+
Upload the file.
183+
184+
Arguments:
185+
bossURI
186+
"""
187+
# Chunk the file into its parts
188+
files = file_compute(
189+
xs[0], xs[1], ys[0], ys[1], zs[0], zs[1], block_size=self.block_size
190+
)
191+
indices = blockfile_indices(xs, ys, zs, block_size=self.block_size)
192+
193+
for f, i in zip(files, indices):
194+
try:
195+
data_partial = self.fs.retrieve(col, exp, chan, res, f)
196+
except Exception:
197+
data_partial = np.zeros(self.block_size, dtype="uint8")
198+
199+
data_partial[
200+
i[0][0]: i[0][1], i[1][0]: i[1][1], i[2][0]: i[2][1]
201+
] = data[
202+
(f[0] + i[0][0]) - xs[0]: (f[0] + i[0][1]) - xs[0],
203+
(f[1] + i[1][0]) - ys[0]: (f[1] + i[1][1]) - ys[0],
204+
(f[2] + i[2][0]) - zs[0]: (f[2] + i[2][1]) - zs[0],
205+
]
206+
data_partial = self.fs.store(data_partial, col, exp, chan, res, f)
207+
208+
def getdata(
209+
self,
210+
col: str,
211+
exp: str,
212+
chan: str,
213+
res: int,
214+
xs: Tuple[int, int],
215+
ys: Tuple[int, int],
216+
zs: Tuple[int, int],
217+
):
218+
"""
219+
Get the data from disk.
220+
221+
Arguments:
222+
bossURI
223+
224+
"""
225+
files = file_compute(
226+
xs[0], xs[1], ys[0], ys[1], zs[0], zs[1], block_size=self.block_size
227+
)
228+
indices = blockfile_indices(xs, ys, zs, block_size=self.block_size)
229+
230+
payload = np.zeros(
231+
((xs[1] - xs[0]), (ys[1] - ys[0]), (zs[1] - zs[0])), dtype="uint8"
232+
)
233+
for f, i in zip(files, indices):
234+
try:
235+
data_partial = self.fs.retrieve(col, exp, chan, res, f)[
236+
i[0][0]: i[0][1], i[1][0]: i[1][1], i[2][0]: i[2][1]
237+
]
238+
except:
239+
data_partial = np.zeros(self.block_size, dtype="uint8")[
240+
i[0][0]: i[0][1], i[1][0]: i[1][1], i[2][0]: i[2][1]
241+
]
242+
payload[
243+
(f[0] + i[0][0]) - xs[0]: (f[0] + i[0][1]) - xs[0],
244+
(f[1] + i[1][0]) - ys[0]: (f[1] + i[1][1]) - ys[0],
245+
(f[2] + i[2][0]) - zs[0]: (f[2] + i[2][1]) - zs[0],
246+
] = data_partial
247+
248+
return payload
249+
250+
def __str__(self):
251+
return f"<ChunkedFilesystemStorageManager [{str(self.fs)}]>"
252+
253+
def get_stack_names(self):
254+
"""
255+
Get a list of the names of the storage managers that back this one.
256+
257+
Arguments:
258+
None
259+
260+
Returns:
261+
List[str]
262+
263+
"""
264+
if self.is_terminal:
265+
return [str(self)]
266+
else:
267+
return [str(self), *self._next.get_stack_names()]

0 commit comments

Comments
 (0)