Skip to content

Commit 40296a1

Browse files
[Feature] More efficient cache directory structure (#681)
* [Feature] More efficient cache directory structure * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * bug fix * another bug fix --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent efb9717 commit 40296a1

File tree

5 files changed

+31
-39
lines changed

5 files changed

+31
-39
lines changed

executorlib/standalone/cache.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ def get_cache_data(cache_directory: str) -> list[dict]:
2828

2929
file_lst = []
3030
for task_key in os.listdir(cache_directory):
31-
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
32-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
33-
if os.path.exists(file_name):
31+
file_name = os.path.join(cache_directory, task_key)
32+
if task_key[-5:] == "_o.h5":
3433
with h5py.File(file_name, "r") as hdf:
3534
file_content_dict = {
3635
key: cloudpickle.loads(np.void(hdf["/" + key]))

executorlib/task_scheduler/file/backend.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,19 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None:
4242
None
4343
4444
"""
45-
file_name_out = os.path.splitext(file_name)[0]
46-
os.rename(file_name, file_name_out + ".h5ready")
45+
file_name_out = os.path.splitext(file_name)[0][:-2]
46+
os.rename(file_name, file_name_out + "_r.h5")
4747
if "result" in output:
4848
dump(
49-
file_name=file_name_out + ".h5ready",
49+
file_name=file_name_out + "_r.h5",
5050
data_dict={"output": output["result"], "runtime": runtime},
5151
)
5252
else:
5353
dump(
54-
file_name=file_name_out + ".h5ready",
54+
file_name=file_name_out + "_r.h5",
5555
data_dict={"error": output["error"], "runtime": runtime},
5656
)
57-
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
57+
os.rename(file_name_out + "_r.h5", file_name_out + "_o.h5")
5858

5959

6060
def backend_execute_task_in_file(file_name: str) -> None:

executorlib/task_scheduler/file/shared.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,9 @@ def execute_tasks_h5(
108108
resource_dict=task_resource_dict,
109109
)
110110
if task_key not in memory_dict:
111-
if not (
112-
task_key in os.listdir(cache_directory)
113-
and "cache.h5out"
114-
in os.listdir(os.path.join(cache_directory, task_key))
115-
):
116-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
117-
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
111+
if task_key + "_o.h5" not in os.listdir(cache_directory):
112+
os.makedirs(cache_directory, exist_ok=True)
113+
file_name = os.path.join(cache_directory, task_key + "_i.h5")
118114
dump(file_name=file_name, data_dict=data_dict)
119115
if not disable_dependencies:
120116
task_dependent_lst = [
@@ -138,10 +134,10 @@ def execute_tasks_h5(
138134
resource_dict=task_resource_dict,
139135
config_directory=pysqa_config_directory,
140136
backend=backend,
141-
cache_directory=os.path.join(cache_directory, task_key),
137+
cache_directory=cache_directory,
142138
)
143139
file_name_dict[task_key] = os.path.join(
144-
cache_directory, task_key, "cache.h5out"
140+
cache_directory, task_key + "_o.h5"
145141
)
146142
memory_dict[task_key] = task_dict["future"]
147143
future_queue.task_done()
@@ -197,7 +193,7 @@ def _check_task_output(
197193
Future: The updated future object.
198194
199195
"""
200-
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
196+
file_name = os.path.join(cache_directory, task_key + "_o.h5")
201197
if not os.path.exists(file_name):
202198
return future_obj
203199
exec_flag, no_error_flag, result = get_output(file_name=file_name)

executorlib/task_scheduler/interactive/shared.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,9 @@ def _execute_task_with_cache(
151151
fn_kwargs=task_dict["kwargs"],
152152
resource_dict=task_dict.get("resource_dict", {}),
153153
)
154-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
155-
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
156-
if not (
157-
task_key in os.listdir(cache_directory)
158-
and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key))
159-
):
154+
os.makedirs(cache_directory, exist_ok=True)
155+
file_name = os.path.join(cache_directory, task_key + "_o.h5")
156+
if task_key + "_o.h5" not in os.listdir(cache_directory):
160157
f = task_dict.pop("future")
161158
if f.set_running_or_notify_cancel():
162159
try:

tests/test_cache_backend_execute.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ def test_execute_function_mixed(self):
3535
fn_args=[1],
3636
fn_kwargs={"b": 2},
3737
)
38-
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
39-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
38+
file_name = os.path.join(cache_directory, task_key + "_i.h5")
39+
os.makedirs(cache_directory, exist_ok=True)
4040
dump(file_name=file_name, data_dict=data_dict)
4141
backend_execute_task_in_file(file_name=file_name)
4242
future_obj = Future()
@@ -46,11 +46,11 @@ def test_execute_function_mixed(self):
4646
self.assertTrue(future_obj.done())
4747
self.assertEqual(future_obj.result(), 3)
4848
self.assertTrue(
49-
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
49+
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
5050
> 0.0
5151
)
5252
future_file_obj = FutureItem(
53-
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
53+
file_name=os.path.join(cache_directory, task_key + "_o.h5")
5454
)
5555
self.assertTrue(future_file_obj.done())
5656
self.assertEqual(future_file_obj.result(), 3)
@@ -63,7 +63,7 @@ def test_execute_function_args(self):
6363
fn_args=[1, 2],
6464
fn_kwargs={},
6565
)
66-
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
66+
file_name = os.path.join(cache_directory, task_key + "_i.h5")
6767
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
6868
dump(file_name=file_name, data_dict=data_dict)
6969
backend_execute_task_in_file(file_name=file_name)
@@ -74,11 +74,11 @@ def test_execute_function_args(self):
7474
self.assertTrue(future_obj.done())
7575
self.assertEqual(future_obj.result(), 3)
7676
self.assertTrue(
77-
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
77+
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
7878
> 0.0
7979
)
8080
future_file_obj = FutureItem(
81-
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
81+
file_name=os.path.join(cache_directory, task_key + "_o.h5")
8282
)
8383
self.assertTrue(future_file_obj.done())
8484
self.assertEqual(future_file_obj.result(), 3)
@@ -91,8 +91,8 @@ def test_execute_function_kwargs(self):
9191
fn_args=[],
9292
fn_kwargs={"a": 1, "b": 2},
9393
)
94-
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
95-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
94+
file_name = os.path.join(cache_directory, task_key + "_i.h5")
95+
os.makedirs(cache_directory, exist_ok=True)
9696
dump(file_name=file_name, data_dict=data_dict)
9797
backend_execute_task_in_file(file_name=file_name)
9898
future_obj = Future()
@@ -102,11 +102,11 @@ def test_execute_function_kwargs(self):
102102
self.assertTrue(future_obj.done())
103103
self.assertEqual(future_obj.result(), 3)
104104
self.assertTrue(
105-
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
105+
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
106106
> 0.0
107107
)
108108
future_file_obj = FutureItem(
109-
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
109+
file_name=os.path.join(cache_directory, task_key + "_o.h5")
110110
)
111111
self.assertTrue(future_file_obj.done())
112112
self.assertEqual(future_file_obj.result(), 3)
@@ -119,8 +119,8 @@ def test_execute_function_error(self):
119119
fn_args=[],
120120
fn_kwargs={"a": 1},
121121
)
122-
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
123-
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
122+
file_name = os.path.join(cache_directory, task_key + "_i.h5")
123+
os.makedirs(cache_directory, exist_ok=True)
124124
dump(file_name=file_name, data_dict=data_dict)
125125
backend_execute_task_in_file(file_name=file_name)
126126
future_obj = Future()
@@ -131,11 +131,11 @@ def test_execute_function_error(self):
131131
with self.assertRaises(ValueError):
132132
future_obj.result()
133133
self.assertTrue(
134-
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
134+
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
135135
> 0.0
136136
)
137137
future_file_obj = FutureItem(
138-
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
138+
file_name=os.path.join(cache_directory, task_key + "_o.h5")
139139
)
140140
self.assertTrue(future_file_obj.done())
141141
with self.assertRaises(ValueError):

0 commit comments

Comments
 (0)