18
18
19
19
LOG = logging .getLogger ("pubtools.pulp" )
20
20
21
+ MAX_RETRIES = int (os .getenv ("PUBTOOLS_MAX_COPY_RETRIES" ) or "5" )
22
+
21
23
22
24
def supports_type (pushitem_type ):
23
25
"""Decorator used to define which PulpPushItem subclass implements support
@@ -200,20 +202,88 @@ def associated_items_single_batch(cls, pulp_client, items, copy_options):
200
202
201
203
It is guaranteed that every yielded item exists in the desired
202
204
target repos in Pulp. A fatal error occurs if this can't be done
203
- for any item in the batch.
205
+ for any item in the batch. A retry mechanism is in place for those
206
+ items that weren't possible to copy due to race conditions.
204
207
"""
208
+ retries = 0
209
+ unit_type = items [0 ].unit_type
210
+ _items_to_process = items
205
211
206
- copy_crit = {}
207
- copy_opers = {}
208
- copy_results = []
212
+ while retries <= MAX_RETRIES :
213
+ copy_crit , copy_items , nocopy_items = cls ._prepare_copy_items (
214
+ _items_to_process
215
+ )
216
+ copy_opers , copy_results = cls ._submit_copies (
217
+ pulp_client , unit_type , copy_crit , copy_options
218
+ )
219
+
220
+ # Copies have been started.
221
+ # Any items which didn't need a copy can be immediately yielded now.
222
+ if nocopy_items :
223
+ yield f_return (nocopy_items )
224
+
225
+ # Add some reasonable logging onto the copies...
226
+ def log_copy_done (f ):
227
+ if not f .cancelled () and not f .exception ():
228
+ tasks = f .result ()
229
+ oper = copy_opers [f ]
230
+ for t in tasks :
231
+ oper .log_copy_done (t )
232
+
233
+ for f in copy_results :
234
+ f .add_done_callback (log_copy_done )
235
+
236
+ # A helper to refresh the state of each item in Pulp and make sure they
237
+ # were copied OK.
238
+ def refresh_after_copy (_ ):
239
+ # Get an up-to-date version of all the copy items.
240
+ f = cls .items_with_pulp_state_single_batch (pulp_client , copy_items )
241
+
242
+ asserting_all_copied_ok_maybe_fatal = partial (
243
+ asserting_all_copied_ok , fatal = retries >= MAX_RETRIES
244
+ )
245
+ # Raise if any still have missing repos, only if we attempted all retries.
246
+ f = f_map (f , asserting_all_copied_ok_maybe_fatal )
247
+
248
+ return f
249
+
250
+ # This future completes once *all* copies are done successfully.
251
+ # TODO: this still could be improved, as not every item needs every copy
252
+ # before the state could be refreshed.
253
+ all_copies = f_sequence (copy_results )
254
+ # To finish up: wait for all copies to complete, then refresh item states
255
+ # and ensure they're no longer missing any repos.
256
+ finished = f_flat_map (all_copies , refresh_after_copy )
257
+
258
+ to_retry = []
259
+ to_yield = []
260
+ for item in finished .result ():
261
+ if item .missing_pulp_repos :
262
+ to_retry .append (item )
263
+ else :
264
+ to_yield .append (item )
265
+ # yield successfully copied items
266
+ yield f_return (to_yield )
267
+
268
+ # if there are not items for copy, end retry loop
269
+ if not to_retry :
270
+ break
271
+ # otherwise increment retry counter and try copy with unsuccessfully copied items
272
+ retries += 1
273
+ _items_to_process = to_retry
274
+ LOG .info (
275
+ "Retrying copy for %s item(s). Attempt %s/%s" ,
276
+ len (to_retry ),
277
+ retries ,
278
+ MAX_RETRIES ,
279
+ )
209
280
281
+ @classmethod
282
+ def _prepare_copy_items (cls , items ):
283
+ copy_crit = {}
210
284
copy_items = []
211
285
nocopy_items = []
212
286
213
- unit_type = items [0 ].unit_type
214
-
215
- base_crit = Criteria .with_unit_type (unit_type ) if unit_type else None
216
-
217
287
for item in items :
218
288
if not item .missing_pulp_repos :
219
289
# Don't need to do anything with this item.
@@ -234,6 +304,15 @@ def associated_items_single_batch(cls, pulp_client, items, copy_options):
234
304
key = (src_repo_id , dest_repo_id )
235
305
copy_crit .setdefault (key , []).append (crit )
236
306
307
+ return copy_crit , copy_items , nocopy_items
308
+
309
+ @classmethod
310
+ def _submit_copies (cls , pulp_client , unit_type , copy_crit , copy_options ):
311
+ copy_opers = {}
312
+ copy_results = []
313
+
314
+ base_crit = Criteria .with_unit_type (unit_type ) if unit_type else None
315
+
237
316
for key in copy_crit .keys ():
238
317
(src_repo_id , dest_repo_id ) = key
239
318
@@ -254,41 +333,7 @@ def associated_items_single_batch(cls, pulp_client, items, copy_options):
254
333
255
334
copy_results .append (copy_f )
256
335
257
- # Copies have been started.
258
- # Any items which didn't need a copy can be immediately yielded now.
259
- if nocopy_items :
260
- yield f_return (nocopy_items )
261
-
262
- # Add some reasonable logging onto the copies...
263
- def log_copy_done (f ):
264
- if not f .cancelled () and not f .exception ():
265
- tasks = f .result ()
266
- oper = copy_opers [f ]
267
- for t in tasks :
268
- oper .log_copy_done (t )
269
-
270
- for f in copy_results :
271
- f .add_done_callback (log_copy_done )
272
-
273
- # A helper to refresh the state of each item in Pulp and make sure they
274
- # were copied OK.
275
- def refresh_after_copy (_ ):
276
- # Get an up-to-date version of all the copy items.
277
- f = cls .items_with_pulp_state_single_batch (pulp_client , copy_items )
278
-
279
- # Raise if any still have missing repos.
280
- f = f_map (f , asserting_all_copied_ok )
281
-
282
- return f
283
-
284
- # This future completes once *all* copies are done successfully.
285
- # TODO: this still could be improved, as not every item needs every copy
286
- # before the state could be refreshed.
287
- all_copies = f_sequence (copy_results )
288
-
289
- # To finish up: wait for all copies to complete, then refresh item states
290
- # and ensure they're no longer missing any repos.
291
- yield f_flat_map (all_copies , refresh_after_copy )
336
+ return copy_opers , copy_results
292
337
293
338
@property
294
339
def in_pulp_repos (self ):
0 commit comments