Skip to content

Commit f75ddea

Browse files
committed
close method for api
1 parent 97e2b08 commit f75ddea

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

parsl/dataflow/dflow.py

+5
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,7 @@ def cleanup(self) -> None:
12031203

12041204
self.log_task_states()
12051205

1206+
# TODO: do this in the basic memoizer
12061207
# Checkpointing takes priority over the rest of the tasks
12071208
# checkpoint if any valid checkpoint method is specified
12081209
if self.checkpoint_mode is not None:
@@ -1215,6 +1216,10 @@ def cleanup(self) -> None:
12151216
logger.info("Stopping checkpoint timer")
12161217
self._checkpoint_timer.close()
12171218

1219+
logger.info("Closing memoizer")
1220+
self.memoizer.close()
1221+
logger.info("Closed memoizer")
1222+
12181223
# Send final stats
12191224
self.usage_tracker.send_end_message()
12201225
self.usage_tracker.close()

parsl/dataflow/memoization.py

+6
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ class Memoizer:
161161
def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None:
162162
raise NotImplementedError
163163

164+
def close(self) -> None:
165+
raise NotImplementedError
166+
164167
def update_memo(self, task: TaskRecord, r: Future[Any]) -> None:
165168
raise NotImplementedError
166169

@@ -236,6 +239,9 @@ def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files:
236239
logger.info("App caching disabled for all apps")
237240
self.memo_lookup_table = {}
238241

242+
def close(self) -> None:
243+
pass # nothing to close but more should move here
244+
239245
def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
240246
"""Create a hash of the task and its inputs and check the lookup table for this hash.
241247

0 commit comments

Comments
 (0)