19
19
20
20
use std:: collections:: { HashMap , HashSet } ;
21
21
use std:: path:: { Path , PathBuf } ;
22
- use std:: sync:: Arc ;
22
+ use std:: sync:: { Arc , OnceLock } ;
23
23
use std:: time:: Duration ;
24
24
25
25
use anyhow:: Context ;
@@ -295,6 +295,16 @@ async fn list_splits_metadata(
295
295
Ok ( splits)
296
296
}
297
297
298
+ /// In order to avoid hammering the load on the metastore, we can throttle the rate of split
299
+ /// deletion by setting this environment variable.
300
+ fn get_maximum_split_deletion_rate_per_sec ( ) -> Option < usize > {
301
+ static MAXIMUM_SPLIT_DELETION_RATE_PER_SEC : std:: sync:: OnceLock < Option < usize > > =
302
+ OnceLock :: new ( ) ;
303
+ * MAXIMUM_SPLIT_DELETION_RATE_PER_SEC . get_or_init ( || {
304
+ quickwit_common:: get_from_env_opt :: < usize > ( "QW_MAX_SPLIT_DELETION_RATE_PER_SEC" )
305
+ } )
306
+ }
307
+
298
308
/// Removes any splits marked for deletion which haven't been
299
309
/// updated after `updated_before_timestamp` in batches of 1000 splits.
300
310
///
@@ -325,9 +335,18 @@ async fn delete_splits_marked_for_deletion_several_indexes(
325
335
. with_limit ( DELETE_SPLITS_BATCH_SIZE )
326
336
. sort_by_index_uid ( ) ;
327
337
328
- let mut splits_to_delete_possibly_remaining = true ;
338
+ loop {
339
+ let sleep_duration: Duration = if let Some ( maximum_split_deletion_per_sec) =
340
+ get_maximum_split_deletion_rate_per_sec ( )
341
+ {
342
+ Duration :: from_secs (
343
+ DELETE_SPLITS_BATCH_SIZE . div_ceil ( maximum_split_deletion_per_sec) as u64 ,
344
+ )
345
+ } else {
346
+ Duration :: default ( )
347
+ } ;
348
+ let sleep_future = tokio:: time:: sleep ( sleep_duration) ;
329
349
330
- while splits_to_delete_possibly_remaining {
331
350
let splits_metadata_to_delete: Vec < SplitMetadata > = match protect_future (
332
351
progress_opt,
333
352
list_splits_metadata ( & metastore, & list_splits_query) ,
@@ -345,7 +364,7 @@ async fn delete_splits_marked_for_deletion_several_indexes(
345
364
// To detect if this is the last page, we check if the number of splits is less than the
346
365
// limit.
347
366
assert ! ( splits_metadata_to_delete. len( ) <= DELETE_SPLITS_BATCH_SIZE ) ;
348
- splits_to_delete_possibly_remaining =
367
+ let splits_to_delete_possibly_remaining =
349
368
splits_metadata_to_delete. len ( ) == DELETE_SPLITS_BATCH_SIZE ;
350
369
351
370
// set split after which to search for the next loop
@@ -378,6 +397,14 @@ async fn delete_splits_marked_for_deletion_several_indexes(
378
397
& mut split_removal_info,
379
398
)
380
399
. await ;
400
+
401
+ if splits_to_delete_possibly_remaining {
402
+ sleep_future. await ;
403
+ } else {
404
+ // stop the gc if this was the last batch
405
+ // we are guaranteed to make progress due to .after_split()
406
+ break ;
407
+ }
381
408
}
382
409
383
410
split_removal_info
0 commit comments