@@ -121,6 +121,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
121
121
return pickle .dumps (["types.FunctionType" , f .__name__ , f .__module__ ])
122
122
123
123
124
+ def make_hash (task : TaskRecord ) -> str :
125
+ """Create a hash of the task inputs.
126
+
127
+ Args:
128
+ - task (dict) : Task dictionary from dfk.tasks
129
+
130
+ Returns:
131
+ - hash (str) : A unique hash string
132
+ """
133
+
134
+ t : List [bytes ] = []
135
+
136
+ # if kwargs contains an outputs parameter, that parameter is removed
137
+ # and normalised differently - with output_ref set to True.
138
+ # kwargs listed in ignore_for_cache will also be removed
139
+
140
+ filtered_kw = task ['kwargs' ].copy ()
141
+
142
+ ignore_list = task ['ignore_for_cache' ]
143
+
144
+ logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
145
+ for k in ignore_list :
146
+ logger .debug ("Ignoring kwarg %s" , k )
147
+ del filtered_kw [k ]
148
+
149
+ if 'outputs' in task ['kwargs' ]:
150
+ outputs = task ['kwargs' ]['outputs' ]
151
+ del filtered_kw ['outputs' ]
152
+ t .append (id_for_memo (outputs , output_ref = True ))
153
+
154
+ t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
155
+
156
+ x = b'' .join (t )
157
+ return hashlib .md5 (x ).hexdigest ()
158
+
159
+
124
160
class Memoizer :
125
161
def start (self , * , dfk : DataFlowKernel , memoize : bool = True , checkpoint_files : Sequence [str ], run_dir : str ) -> None :
126
162
raise NotImplementedError
@@ -200,41 +236,6 @@ def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files:
200
236
logger .info ("App caching disabled for all apps" )
201
237
self .memo_lookup_table = {}
202
238
203
- def make_hash (self , task : TaskRecord ) -> str :
204
- """Create a hash of the task inputs.
205
-
206
- Args:
207
- - task (dict) : Task dictionary from dfk.tasks
208
-
209
- Returns:
210
- - hash (str) : A unique hash string
211
- """
212
-
213
- t : List [bytes ] = []
214
-
215
- # if kwargs contains an outputs parameter, that parameter is removed
216
- # and normalised differently - with output_ref set to True.
217
- # kwargs listed in ignore_for_cache will also be removed
218
-
219
- filtered_kw = task ['kwargs' ].copy ()
220
-
221
- ignore_list = task ['ignore_for_cache' ]
222
-
223
- logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
224
- for k in ignore_list :
225
- logger .debug ("Ignoring kwarg %s" , k )
226
- del filtered_kw [k ]
227
-
228
- if 'outputs' in task ['kwargs' ]:
229
- outputs = task ['kwargs' ]['outputs' ]
230
- del filtered_kw ['outputs' ]
231
- t .append (id_for_memo (outputs , output_ref = True ))
232
-
233
- t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
234
-
235
- x = b'' .join (t )
236
- return hashlib .md5 (x ).hexdigest ()
237
-
238
239
def check_memo (self , task : TaskRecord ) -> Optional [Future [Any ]]:
239
240
"""Create a hash of the task and its inputs and check the lookup table for this hash.
240
241
@@ -256,7 +257,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
256
257
logger .debug ("Task {} will not be memoized" .format (task_id ))
257
258
return None
258
259
259
- hashsum = self . make_hash (task )
260
+ hashsum = make_hash (task )
260
261
logger .debug ("Task {} has memoization hash {}" .format (task_id , hashsum ))
261
262
result = None
262
263
if hashsum in self .memo_lookup_table :
0 commit comments