@@ -8,7 +8,7 @@ use crate::{
8
8
config:: Config ,
9
9
erasure:: Shard ,
10
10
meta:: { Checksum , MetaData , ShardInfo } ,
11
- zdb:: { SequentialZdb , ZdbError , ZdbResult } ,
11
+ zdb:: { Key , SequentialZdb , ZdbConnectionInfo , ZdbError , ZdbResult } ,
12
12
ZstorError , ZstorResult ,
13
13
} ;
14
14
use actix:: prelude:: * ;
@@ -324,6 +324,7 @@ impl Handler<Rebuild> for ZstorActor {
324
324
} ;
325
325
326
326
let input = load_data ( & old_metadata) . await ?;
327
+ let existing_data = input. clone ( ) ;
327
328
let ( mut metadata, shards) = pipeline
328
329
. send ( RebuildData {
329
330
input,
@@ -332,7 +333,31 @@ impl Handler<Rebuild> for ZstorActor {
332
333
} )
333
334
. await ??;
334
335
335
- save_data ( & mut cfg. deref ( ) . clone ( ) , shards, & mut metadata) . await ?;
336
+ // build a list of the key and the backend used for the shards
337
+ let mut used_backends = Vec :: new ( ) ;
338
+ for ( i, data) in existing_data. iter ( ) . enumerate ( ) {
339
+ let key = old_metadata. shards ( ) [ i] . key ( ) . to_vec ( ) ;
340
+ if let Some ( data) = data {
341
+ if data. as_slice ( ) == shards[ i] . as_ref ( ) {
342
+ used_backends. push ( ( key, Some ( old_metadata. shards ( ) [ i] . zdb ( ) . clone ( ) ) ) ) ;
343
+ debug ! ( "Shard {} is the SAME" , i) ;
344
+ } else {
345
+ used_backends. push ( ( key, None ) ) ;
346
+ warn ! ( "Shard {} is DIFFERENT" , i) ;
347
+ }
348
+ } else {
349
+ debug ! ( "Shard {} is MISSING" , i) ;
350
+ used_backends. push ( ( key, None ) ) ;
351
+ }
352
+ }
353
+
354
+ rebuild_data (
355
+ & mut cfg. deref ( ) . clone ( ) ,
356
+ shards,
357
+ & mut metadata,
358
+ used_backends,
359
+ )
360
+ . await ?;
336
361
337
362
info ! (
338
363
"Rebuild file from {} to {}" ,
@@ -471,63 +496,156 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
471
496
Ok ( shards)
472
497
}
473
498
474
- async fn save_data (
499
+ async fn check_backend_space (
500
+ backend : ZdbConnectionInfo ,
501
+ shard_len : usize ,
502
+ ) -> ZdbResult < SequentialZdb > {
503
+ let db = SequentialZdb :: new ( backend. clone ( ) ) . await ?;
504
+ let ns_info = db. ns_info ( ) . await ?;
505
+ match ns_info. free_space ( ) {
506
+ insufficient if ( insufficient as usize ) < shard_len => Err ( ZdbError :: new_storage_size (
507
+ db. connection_info ( ) . clone ( ) ,
508
+ shard_len,
509
+ ns_info. free_space ( ) as usize ,
510
+ ) ) ,
511
+ _ => Ok ( db) ,
512
+ }
513
+ }
514
+
515
+ async fn find_valid_backends (
516
+ cfg : & mut Config ,
517
+ shard_len : usize ,
518
+ needed_backends : usize ,
519
+ skip_backends : Vec < ( Vec < Key > , Option < ZdbConnectionInfo > ) > ,
520
+ ) -> ZstorResult < Vec < SequentialZdb > > {
521
+ loop {
522
+ debug ! ( "Finding backend config" ) ;
523
+ let backends = cfg. shard_stores ( ) ?;
524
+ let mut failed_shards = 0 ;
525
+ let mut valid_dbs = Vec :: new ( ) ;
526
+
527
+ let handles: Vec < _ > = backends
528
+ . into_iter ( )
529
+ . filter ( |backend| {
530
+ !skip_backends
531
+ . iter ( )
532
+ . any ( |( _, b) | b. as_ref ( ) == Some ( backend) )
533
+ } )
534
+ . map ( |backend| {
535
+ tokio:: spawn ( async move { check_backend_space ( backend, shard_len) . await } )
536
+ } )
537
+ . collect ( ) ;
538
+
539
+ for result in join_all ( handles) . await {
540
+ match result? {
541
+ Ok ( db) => valid_dbs. push ( db) ,
542
+ Err ( e) => {
543
+ debug ! ( "Backend error: {}" , e) ;
544
+ cfg. remove_shard ( e. remote ( ) ) ;
545
+ failed_shards += 1 ;
546
+ }
547
+ }
548
+ }
549
+
550
+ if valid_dbs. len ( ) >= needed_backends && failed_shards == 0 {
551
+ return Ok ( valid_dbs) ;
552
+ }
553
+
554
+ debug ! ( "Backend config failed, retrying..." ) ;
555
+ }
556
+ }
557
+
558
+ async fn rebuild_data (
475
559
cfg : & mut Config ,
476
560
shards : Vec < Shard > ,
477
561
metadata : & mut MetaData ,
562
+ // used_backends specifies which backends are already used
563
+ // which also means we don't need to check it again and the shard is not missing
564
+ used_backends : Vec < ( Vec < Key > , Option < ZdbConnectionInfo > ) > ,
478
565
) -> ZstorResult < ( ) > {
479
566
let shard_len = if shards. is_empty ( ) {
480
567
0
481
568
} else {
482
569
shards[ 0 ] . len ( )
483
570
} ;
484
-
485
- let dbs = loop {
486
- debug ! ( "Finding backend config" ) ;
487
- let backends = cfg. shard_stores ( ) ?;
488
-
489
- let mut failed_shards: usize = 0 ;
490
- let mut handles: Vec < JoinHandle < ZdbResult < _ > > > = Vec :: with_capacity ( shards. len ( ) ) ;
491
-
492
- for backend in backends {
493
- handles. push ( tokio:: spawn ( async move {
494
- let db = SequentialZdb :: new ( backend. clone ( ) ) . await ?;
495
- // check space in backend
496
- let ns_info = db. ns_info ( ) . await ?;
497
- match ns_info. free_space ( ) {
498
- insufficient if ( insufficient as usize ) < shard_len => {
499
- Err ( ZdbError :: new_storage_size (
500
- db. connection_info ( ) . clone ( ) ,
501
- shard_len,
502
- ns_info. free_space ( ) as usize ,
503
- ) )
504
- }
505
- _ => Ok ( db) ,
506
- }
507
- } ) ) ;
571
+ let mut existing_backends_num = 0 ;
572
+ for ( _, ci) in used_backends. iter ( ) {
573
+ if ci. is_some ( ) {
574
+ existing_backends_num += 1 ;
508
575
}
576
+ }
509
577
510
- let mut dbs = Vec :: new ( ) ;
511
- for db in join_all ( handles) . await {
512
- match db? {
513
- Err ( zdbe) => {
514
- debug ! ( "could not connect to 0-db: {}" , zdbe) ;
515
- cfg. remove_shard ( zdbe. remote ( ) ) ;
516
- failed_shards += 1 ;
517
- }
518
- Ok ( db) => dbs. push ( db) , // no error so healthy db backend
578
+ let new_dbs = find_valid_backends (
579
+ cfg,
580
+ shard_len,
581
+ shards. len ( ) - existing_backends_num,
582
+ used_backends. clone ( ) ,
583
+ )
584
+ . await ?;
585
+
586
+ // create the key,connection_info, and db for the shard
587
+ // - if the backend is already used, we don't need to set the shard
588
+ // hence the None db
589
+ // - if the backend is not used, we need to set the shard
590
+ // hence the Some(db) which will be used the set the shard
591
+ let mut new_dbs = new_dbs. into_iter ( ) ;
592
+ let mut key_dbs = Vec :: new ( ) ;
593
+ for ( key, ci) in used_backends {
594
+ match ci {
595
+ Some ( ci) => key_dbs. push ( ( key, ci, None ) ) ,
596
+ None => {
597
+ // unwrap is safe here because we know we have enough backends from the find_valid_backends
598
+ let db = new_dbs. next ( ) . unwrap ( ) ;
599
+ key_dbs. push ( ( key, db. connection_info ( ) . clone ( ) , Some ( db) ) ) ;
519
600
}
520
601
}
602
+ }
521
603
522
- // if we find one we are good
523
- if failed_shards == 0 {
524
- debug ! ( "found valid backend configuration" ) ;
525
- break dbs;
526
- }
604
+ let mut handles: Vec < JoinHandle < ZstorResult < _ > > > = Vec :: with_capacity ( shards. len ( ) ) ;
605
+ for ( ( existing_key, existing_ci, db) , ( shard_idx, shard) ) in
606
+ key_dbs. into_iter ( ) . zip ( shards. into_iter ( ) . enumerate ( ) )
607
+ {
608
+ handles. push ( tokio:: spawn ( async move {
609
+ if let Some ( db) = db {
610
+ let keys = db. set ( & shard) . await ?;
611
+ Ok ( ShardInfo :: new (
612
+ shard_idx,
613
+ shard. checksum ( ) ,
614
+ keys,
615
+ db. connection_info ( ) . clone ( ) ,
616
+ ) )
617
+ } else {
618
+ // no need to db.set if it is an already used backend (shard is not missing)
619
+ Ok ( ShardInfo :: new (
620
+ shard_idx,
621
+ shard. checksum ( ) ,
622
+ existing_key. clone ( ) ,
623
+ existing_ci. clone ( ) ,
624
+ ) )
625
+ }
626
+ } ) ) ;
627
+ }
628
+
629
+ for shard_info in try_join_all ( handles) . await ? {
630
+ metadata. add_shard ( shard_info?) ;
631
+ }
527
632
528
- debug ! ( "Backend config failed" ) ;
633
+ Ok ( ( ) )
634
+ }
635
+
636
+ async fn save_data (
637
+ cfg : & mut Config ,
638
+ shards : Vec < Shard > ,
639
+ metadata : & mut MetaData ,
640
+ ) -> ZstorResult < ( ) > {
641
+ let shard_len = if shards. is_empty ( ) {
642
+ 0
643
+ } else {
644
+ shards[ 0 ] . len ( )
529
645
} ;
530
646
647
+ let dbs = find_valid_backends ( cfg, shard_len, shards. len ( ) , [ ] . to_vec ( ) ) . await ?;
648
+
531
649
trace ! ( "store shards in backends" ) ;
532
650
533
651
let mut handles: Vec < JoinHandle < ZstorResult < _ > > > = Vec :: with_capacity ( shards. len ( ) ) ;
0 commit comments