diff --git a/core/src/repair/repair_generic_traversal.rs b/core/src/repair/repair_generic_traversal.rs index b4b330e3d99bac..68d49880650830 100644 --- a/core/src/repair/repair_generic_traversal.rs +++ b/core/src/repair/repair_generic_traversal.rs @@ -47,6 +47,7 @@ pub fn get_unknown_last_index( slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, limit: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { let iter = GenericTraversal::new(tree); let mut unknown_last = Vec::new(); @@ -74,8 +75,13 @@ pub fn get_unknown_last_index( unknown_last.sort_by(|(_, _, count1), (_, _, count2)| count2.cmp(count1)); unknown_last .iter() + .filter_map(|(slot, received, _)| { + RepairService::request_repair_if_needed( + outstanding_repairs, + ShredRepairType::HighestShred(*slot, *received), + ) + }) .take(limit) - .map(|(slot, received, _)| ShredRepairType::HighestShred(*slot, *received)) .collect() } @@ -115,6 +121,7 @@ pub fn get_closest_completion( slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, limit: usize, + outstanding_repairs: &mut HashMap, ) -> (Vec, /* processed slots */ usize) { let mut slot_dists: Vec<(Slot, u64)> = Vec::default(); let iter = GenericTraversal::new(tree); @@ -192,6 +199,7 @@ pub fn get_closest_completion( path_slot, slot_meta, limit - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); total_processed_slots += 1; @@ -217,12 +225,14 @@ pub mod test { let last_shred = blockstore.meta(0).unwrap().unwrap().received; let mut slot_meta_cache = HashMap::default(); let mut processed_slots = HashSet::default(); + let mut outstanding_requests = HashMap::new(); let repairs = get_unknown_last_index( &heaviest_subtree_fork_choice, &blockstore, &mut slot_meta_cache, &mut processed_slots, 10, + &mut outstanding_requests, ); assert_eq!( repairs, @@ -231,6 +241,18 @@ pub mod test { .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); + assert_eq!(outstanding_requests.len(), repairs.len()); + + // Ensure redundant repairs are not generated. + let repairs = get_unknown_last_index( + &heaviest_subtree_fork_choice, + &blockstore, + &mut slot_meta_cache, + &mut processed_slots, + 10, + &mut outstanding_requests, + ); + assert_eq!(repairs, []); } #[test] @@ -238,6 +260,7 @@ pub mod test { let (blockstore, heaviest_subtree_fork_choice) = setup_forks(); let mut slot_meta_cache = HashMap::default(); let mut processed_slots = HashSet::default(); + let mut outstanding_requests = HashMap::new(); let (repairs, _) = get_closest_completion( &heaviest_subtree_fork_choice, &blockstore, @@ -245,8 +268,10 @@ pub mod test { &mut slot_meta_cache, &mut processed_slots, 10, + &mut outstanding_requests, ); assert_eq!(repairs, []); + assert_eq!(outstanding_requests.len(), repairs.len()); let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5)))); let ledger_path = get_tmp_ledger_path!(); @@ -262,6 +287,7 @@ pub mod test { let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks); let mut slot_meta_cache = HashMap::default(); let mut processed_slots = HashSet::default(); + outstanding_requests = HashMap::new(); sleep_shred_deferment_period(); let (repairs, _) = get_closest_completion( &heaviest_subtree_fork_choice, @@ -270,8 +296,35 @@ pub mod test { &mut slot_meta_cache, &mut processed_slots, 1, + &mut outstanding_requests, ); assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]); + assert_eq!(outstanding_requests.len(), repairs.len()); + + let (repairs, _) = get_closest_completion( + &heaviest_subtree_fork_choice, + &blockstore, + 0, // root_slot + &mut slot_meta_cache, + &mut processed_slots, + 4, + &mut outstanding_requests, + ); + assert_eq!(repairs.len(), 4); + assert_eq!(outstanding_requests.len(), 5); + + // Ensure redundant repairs are not generated. + let (repairs, _) = get_closest_completion( + &heaviest_subtree_fork_choice, + &blockstore, + 0, // root_slot + &mut slot_meta_cache, + &mut processed_slots, + 1, + &mut outstanding_requests, + ); + assert_eq!(repairs.len(), 0); + assert_eq!(outstanding_requests.len(), 5); } fn add_tree_with_missing_shreds( diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 0925601b93e9de..7e57e571e37ba3 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -42,7 +42,7 @@ use { }, solana_streamer::sendmmsg::{batch_send, SendPktsError}, std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, iter::Iterator, net::{SocketAddr, UdpSocket}, sync::{ @@ -59,6 +59,11 @@ use { const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200); const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK; +// This is the amount of time we will wait for a repair request to be fulfilled +// before making another request. Value is based on reasonable upper bound of +// expected network delays in requesting repairs and receiving shreds. +const REPAIR_REQUEST_TIMEOUT_MS: u64 = 100; + // When requesting repair for a specific shred through the admin RPC, we will // request up to NUM_PEERS_TO_SAMPLE_FOR_REPAIRS in the event a specific, valid // target node is not provided. This number was chosen to provide reasonable @@ -208,7 +213,7 @@ impl BestRepairsStats { pub const MAX_REPAIR_LENGTH: usize = 512; pub const MAX_REPAIR_PER_DUPLICATE: usize = 20; pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000; -pub const REPAIR_MS: u64 = 100; +pub const REPAIR_MS: u64 = 1; pub const MAX_ORPHANS: usize = 5; pub const MAX_UNKNOWN_LAST_INDEX_REPAIRS: usize = 10; pub const MAX_CLOSEST_COMPLETION_REPAIRS: usize = 100; @@ -328,6 +333,8 @@ impl RepairService { let mut last_stats = Instant::now(); let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY); let mut popular_pruned_forks_requests = HashSet::new(); + // Maps a repair that may still be outstanding to the timestamp it was requested. + let mut outstanding_repairs = HashMap::new(); while !exit.load(Ordering::Relaxed) { let mut set_root_elapsed; @@ -399,11 +406,17 @@ impl RepairService { ); add_votes_elapsed.stop(); + // Purge old entries. They've either completed or need to be retried. + outstanding_repairs.retain(|_repair_request, time| { + timestamp().saturating_sub(*time) < REPAIR_REQUEST_TIMEOUT_MS + }); + let repairs = match repair_info.wen_restart_repair_slots.clone() { Some(slots_to_repair) => Self::generate_repairs_for_wen_restart( blockstore, MAX_REPAIR_LENGTH, &slots_to_repair.read().unwrap(), + &mut outstanding_repairs, ), None => repair_weight.get_best_weighted_repairs( blockstore, @@ -415,6 +428,7 @@ impl RepairService { MAX_CLOSEST_COMPLETION_REPAIRS, &mut repair_timing, &mut best_repairs_stats, + &mut outstanding_repairs, ), }; @@ -631,8 +645,16 @@ impl RepairService { slot: Slot, slot_meta: &SlotMeta, max_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { - Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true) + Self::generate_repairs_for_slot( + blockstore, + slot, + slot_meta, + max_repairs, + true, + outstanding_repairs, + ) } pub fn generate_repairs_for_slot_not_throttled_by_tick( @@ -640,8 +662,16 @@ impl RepairService { slot: Slot, slot_meta: &SlotMeta, max_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { - Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false) + Self::generate_repairs_for_slot( + blockstore, + slot, + slot_meta, + max_repairs, + false, + outstanding_repairs, + ) } /// If this slot is missing shreds generate repairs @@ -651,6 +681,7 @@ impl RepairService { slot_meta: &SlotMeta, max_repairs: usize, throttle_requests_by_shred_tick: bool, + outstanding_repairs: &mut HashMap, ) -> Vec { let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick { DEFER_REPAIR_THRESHOLD_TICKS @@ -680,7 +711,14 @@ impl RepairService { } } } - vec![ShredRepairType::HighestShred(slot, slot_meta.received)] + + match RepairService::request_repair_if_needed( + outstanding_repairs, + ShredRepairType::HighestShred(slot, slot_meta.received), + ) { + Some(repair_request) => vec![repair_request], + None => vec![], + } } else { blockstore .find_missing_data_indexes( @@ -692,7 +730,12 @@ impl RepairService { max_repairs, ) .into_iter() - .map(|i| ShredRepairType::Shred(slot, i)) + .filter_map(|i| { + RepairService::request_repair_if_needed( + outstanding_repairs, + ShredRepairType::Shred(slot, i), + ) + }) .collect() } } @@ -703,6 +746,7 @@ impl RepairService { repairs: &mut Vec, max_repairs: usize, slot: Slot, + outstanding_repairs: &mut HashMap, ) { let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { @@ -713,6 +757,7 @@ impl RepairService { slot, &slot_meta, max_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); let next_slots = slot_meta.next_slots; @@ -727,6 +772,7 @@ impl RepairService { blockstore: &Blockstore, max_repairs: usize, slots: &Vec, + outstanding_repairs: &mut HashMap, ) -> Vec { let mut repairs: Vec = Vec::new(); for slot in slots { @@ -738,6 +784,7 @@ impl RepairService { *slot, &slot_meta, max_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); } else { @@ -884,6 +931,18 @@ impl RepairService { } } + pub fn request_repair_if_needed( + outstanding_repairs: &mut HashMap, + repair_request: ShredRepairType, + ) -> Option { + if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) { + entry.insert(timestamp()); + Some(repair_request) + } else { + None + } + } + /// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end #[cfg(test)] pub fn generate_repairs_in_range( @@ -911,6 +970,7 @@ impl RepairService { slot, &meta, max_repairs - repairs.len(), + &mut HashMap::default(), ); repairs.extend(new_repairs); } @@ -933,6 +993,7 @@ impl RepairService { slot, &slot_meta, MAX_REPAIR_PER_DUPLICATE, + &mut HashMap::default(), )) } } else { @@ -1163,6 +1224,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::default(), ), vec![ ShredRepairType::Orphan(2), @@ -1195,6 +1257,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::default(), ), vec![ShredRepairType::HighestShred(0, 0)] ); @@ -1252,6 +1315,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::default(), ), expected ); @@ -1267,6 +1331,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::default(), )[..], expected[0..expected.len() - 2] ); @@ -1310,6 +1375,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::default(), ), expected ); @@ -1627,6 +1693,7 @@ mod test { &blockstore, max_repairs, &slots_to_repair, + &mut HashMap::default(), ); assert!(result.is_empty()); @@ -1636,6 +1703,7 @@ mod test { &blockstore, max_repairs, &slots_to_repair, + &mut HashMap::default(), ); assert_eq!( result, @@ -1651,6 +1719,7 @@ mod test { &blockstore, max_repairs, &slots_to_repair, + &mut HashMap::default(), ); assert_eq!(result.len(), max_repairs); assert_eq!( diff --git a/core/src/repair/repair_weight.rs b/core/src/repair/repair_weight.rs index cb15c2de27d7e3..c80a3bda6e6c9b 100644 --- a/core/src/repair/repair_weight.rs +++ b/core/src/repair/repair_weight.rs @@ -3,7 +3,7 @@ use { consensus::{heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, tree_diff::TreeDiff}, repair::{ repair_generic_traversal::{get_closest_completion, get_unknown_last_index}, - repair_service::{BestRepairsStats, RepairTiming}, + repair_service::{BestRepairsStats, RepairService, RepairTiming}, repair_weighted_traversal, serve_repair::ShredRepairType, }, @@ -214,6 +214,7 @@ impl RepairWeight { max_closest_completion_repairs: usize, repair_timing: &mut RepairTiming, stats: &mut BestRepairsStats, + outstanding_repairs: &mut HashMap, ) -> Vec { let mut repairs = vec![]; let mut processed_slots = HashSet::from([self.root]); @@ -228,6 +229,7 @@ impl RepairWeight { epoch_stakes, epoch_schedule, max_new_orphans, + outstanding_repairs, ); // Subtract 1 because the root is not processed as an orphan let num_orphan_slots = processed_slots.len() - 1; @@ -242,6 +244,7 @@ impl RepairWeight { &mut slot_meta_cache, &mut best_shreds_repairs, max_new_shreds, + outstanding_repairs, ); let num_best_shreds_repairs = best_shreds_repairs.len(); let repair_slots_set: HashSet = @@ -263,6 +266,7 @@ impl RepairWeight { &mut slot_meta_cache, &mut processed_slots, max_unknown_last_index_repairs, + outstanding_repairs, ); let num_unknown_last_index_repairs = unknown_last_index_repairs.len(); let num_unknown_last_index_slots = processed_slots.len() - pre_num_slots; @@ -276,6 +280,7 @@ impl RepairWeight { &mut slot_meta_cache, &mut processed_slots, max_closest_completion_repairs, + outstanding_repairs, ); let num_closest_completion_repairs = closest_completion_repairs.len(); let num_closest_completion_slots = processed_slots.len() - pre_num_slots; @@ -506,6 +511,7 @@ impl RepairWeight { slot_meta_cache: &mut HashMap>, repairs: &mut Vec, max_new_shreds: usize, + outstanding_repairs: &mut HashMap, ) { let root_tree = self.trees.get(&self.root).expect("Root tree must exist"); repair_weighted_traversal::get_best_repair_shreds( @@ -514,6 +520,7 @@ impl RepairWeight { slot_meta_cache, repairs, max_new_shreds, + outstanding_repairs, ); } @@ -525,6 +532,7 @@ impl RepairWeight { epoch_stakes: &HashMap, epoch_schedule: &EpochSchedule, max_new_orphans: usize, + outstanding_repairs: &mut HashMap, ) { // Sort each tree in `self.trees`, by the amount of stake that has voted on each, // tiebreaker going to earlier slots, thus prioritizing earlier slots on the same fork @@ -543,9 +551,9 @@ impl RepairWeight { // Heavier, smaller slots come first Self::sort_by_stake_weight_slot(&mut stake_weighted_trees); - let mut best_orphans: HashSet = HashSet::new(); + let mut new_best_orphan_requests = 0; for (heaviest_tree_root, _) in stake_weighted_trees { - if best_orphans.len() >= max_new_orphans { + if new_best_orphan_requests >= max_new_orphans { break; } if processed_slots.contains(&heaviest_tree_root) { @@ -560,28 +568,33 @@ impl RepairWeight { epoch_schedule, ); if let Some(new_orphan_root) = new_orphan_root { - if new_orphan_root != self.root && !best_orphans.contains(&new_orphan_root) { - best_orphans.insert(new_orphan_root); - repairs.push(ShredRepairType::Orphan(new_orphan_root)); - processed_slots.insert(new_orphan_root); + if new_orphan_root != self.root { + if let Some(repair_request) = RepairService::request_repair_if_needed( + outstanding_repairs, + ShredRepairType::Orphan(new_orphan_root), + ) { + repairs.push(repair_request); + processed_slots.insert(new_orphan_root); + new_best_orphan_requests += 1; + } } } } } // If there are fewer than `max_new_orphans`, just grab the next - // available ones - if best_orphans.len() < max_new_orphans { - for new_orphan in blockstore.orphans_iterator(self.root + 1).unwrap() { - if !best_orphans.contains(&new_orphan) { - repairs.push(ShredRepairType::Orphan(new_orphan)); - best_orphans.insert(new_orphan); - processed_slots.insert(new_orphan); - } - - if best_orphans.len() == max_new_orphans { - break; - } + // available ones. + for new_orphan in blockstore.orphans_iterator(self.root + 1).unwrap() { + if new_best_orphan_requests >= max_new_orphans { + break; + } + if let Some(repair_request) = RepairService::request_repair_if_needed( + outstanding_repairs, + ShredRepairType::Orphan(new_orphan), + ) { + repairs.push(repair_request); + processed_slots.insert(new_orphan); + new_best_orphan_requests += 1; } } } @@ -594,6 +607,7 @@ impl RepairWeight { slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, max_new_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { let mut repairs = Vec::default(); for (_slot, tree) in self.trees.iter() { @@ -606,6 +620,7 @@ impl RepairWeight { slot_meta_cache, processed_slots, max_new_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); } @@ -622,6 +637,7 @@ impl RepairWeight { slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, max_new_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> (Vec, /* processed slots */ usize) { let mut repairs = Vec::default(); let mut total_processed_slots = 0; @@ -636,6 +652,7 @@ impl RepairWeight { slot_meta_cache, processed_slots, max_new_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); total_processed_slots += new_processed_slots; @@ -1486,6 +1503,7 @@ mod test { // Ask for only 1 orphan. Because the orphans have the same weight, // should prioritize smaller orphan first let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots: HashSet = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -1494,6 +1512,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert_eq!( repair_weight @@ -1510,11 +1529,13 @@ mod test { .unwrap() ); assert_eq!(repairs.len(), 1); + assert_eq!(outstanding_repairs.len(), repairs.len()); assert_eq!(repairs[0].slot(), 8); // New vote on same orphan branch, without any new slot chaining // information blockstore should not resolve the orphan repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); let votes = vec![(10, vec![vote_pubkeys[0]])]; repair_weight.add_votes( @@ -1530,12 +1551,15 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 1); + assert_eq!(outstanding_repairs.len(), repairs.len()); assert_eq!(repairs[0].slot(), 8); // Ask for 2 orphans, should return all the orphans repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -1544,13 +1568,29 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 2, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 2); + assert_eq!(outstanding_repairs.len(), repairs.len()); assert_eq!(repairs[0].slot(), 8); assert_eq!(repairs[1].slot(), 20); + // Ensure redundant repairs are not generated. + repair_weight.get_best_orphans( + &blockstore, + &mut processed_slots, + &mut repairs, + bank.epoch_stakes_map(), + bank.epoch_schedule(), + 2, + &mut outstanding_repairs, + ); + assert_eq!(repairs.len(), 2); + assert_eq!(outstanding_repairs.len(), repairs.len()); + // If one orphan gets heavier, should pick that one repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); let votes = vec![(20, vec![vote_pubkeys[0]])]; repair_weight.add_votes( @@ -1566,13 +1606,16 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 1); + assert_eq!(outstanding_repairs.len(), repairs.len()); assert_eq!(repairs[0].slot(), 20); // Resolve the orphans, should now return no // orphans repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); blockstore.add_tree(tr(6) / (tr(8)), true, true, 2, Hash::default()); blockstore.add_tree(tr(11) / (tr(20)), true, true, 2, Hash::default()); @@ -1583,8 +1626,10 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert!(repairs.is_empty()); + assert!(outstanding_repairs.is_empty()); } #[test] @@ -1611,6 +1656,7 @@ mod test { // orphan in the `trees` map, we should search for // exactly one more of the remaining two let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots: HashSet = vec![repair_weight.root].into_iter().collect(); blockstore.add_tree(tr(100) / (tr(101)), true, true, 2, Hash::default()); repair_weight.get_best_orphans( @@ -1620,13 +1666,16 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 2, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 2); + assert_eq!(outstanding_repairs.len(), repairs.len()); assert_eq!(repairs[0].slot(), 8); assert_eq!(repairs[1].slot(), 20); // If we ask for 3 orphans, we should get all of them let mut repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -1635,8 +1684,10 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 3, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 3); + assert_eq!(outstanding_repairs.len(), repairs.len()); assert_eq!(repairs[0].slot(), 8); assert_eq!(repairs[1].slot(), 20); assert_eq!(repairs[2].slot(), 100); @@ -2318,6 +2369,7 @@ mod test { // Get best orphans works as usual let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -2326,12 +2378,14 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 4, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 4); assert_eq!(repairs[0].slot(), 10); assert_eq!(repairs[1].slot(), 20); assert_eq!(repairs[2].slot(), 3); assert_eq!(repairs[3].slot(), 8); + assert_eq!(outstanding_repairs.len(), repairs.len()); } #[test] @@ -2362,6 +2416,7 @@ mod test { // Get best orphans works as usual let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -2370,11 +2425,13 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 4, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 3); assert_eq!(repairs[0].slot(), 6); assert_eq!(repairs[1].slot(), 3); assert_eq!(repairs[2].slot(), 5); + assert_eq!(outstanding_repairs.len(), repairs.len()); // Simulate repair on 6 and 5 for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100, 0) { @@ -2383,6 +2440,7 @@ mod test { // Verify orphans properly updated and chained let mut repairs = vec![]; + outstanding_repairs = HashMap::new(); let mut processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -2391,9 +2449,11 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 4, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 1); assert_eq!(repairs[0].slot(), 3); + assert_eq!(outstanding_repairs.len(), repairs.len()); let mut orphans = repair_weight.trees.keys().copied().collect_vec(); orphans.sort(); diff --git a/core/src/repair/repair_weighted_traversal.rs b/core/src/repair/repair_weighted_traversal.rs index bcc4425ada111b..b3c63a1db0cc7e 100644 --- a/core/src/repair/repair_weighted_traversal.rs +++ b/core/src/repair/repair_weighted_traversal.rs @@ -78,6 +78,7 @@ pub fn get_best_repair_shreds( slot_meta_cache: &mut HashMap>, repairs: &mut Vec, max_new_shreds: usize, + outstanding_repairs: &mut HashMap, ) { let initial_len = repairs.len(); let max_repairs = initial_len + max_new_shreds; @@ -103,6 +104,7 @@ pub fn get_best_repair_shreds( slot, slot_meta, max_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); visited_set.insert(slot); @@ -122,6 +124,7 @@ pub fn get_best_repair_shreds( repairs, max_repairs, *new_child_slot, + outstanding_repairs, ); } visited_set.insert(*new_child_slot); @@ -219,6 +222,7 @@ pub mod test { // `blockstore` and `heaviest_subtree_fork_choice` match exactly, so should // return repairs for all slots (none are completed) in order of traversal let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut slot_meta_cache = HashMap::default(); let last_shred = blockstore.meta(0).unwrap().unwrap().received; @@ -229,6 +233,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 6, + &mut outstanding_repairs, ); assert_eq!( repairs, @@ -237,10 +242,12 @@ pub mod test { .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); + assert_eq!(repairs.len(), outstanding_repairs.len()); // Add some leaves to blockstore, attached to the current best leaf, should prioritize // repairing those new leaves before trying other branches repairs = vec![]; + outstanding_repairs = HashMap::new(); slot_meta_cache = HashMap::default(); let best_overall_slot = heaviest_subtree_fork_choice.best_overall_slot().0; assert_eq!(best_overall_slot, 4); @@ -258,6 +265,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 6, + &mut outstanding_repairs, ); assert_eq!( repairs, @@ -266,9 +274,11 @@ pub mod test { .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); + assert_eq!(repairs.len(), outstanding_repairs.len()); // Completing slots should remove them from the repair list repairs = vec![]; + outstanding_repairs = HashMap::new(); slot_meta_cache = HashMap::default(); let completed_shreds: Vec = [0, 2, 4, 6] .iter() @@ -298,6 +308,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 4, + &mut outstanding_repairs, ); assert_eq!( repairs, @@ -306,10 +317,12 @@ pub mod test { .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); + assert_eq!(repairs.len(), outstanding_repairs.len()); // Adding incomplete children with higher weighted parents, even if // the parents are complete should still be repaired repairs = vec![]; + outstanding_repairs = HashMap::new(); slot_meta_cache = HashMap::default(); blockstore.add_tree(tr(2) / (tr(8)), true, false, 2, Hash::default()); sleep_shred_deferment_period(); @@ -318,15 +331,27 @@ pub mod test { &blockstore, &mut slot_meta_cache, &mut repairs, - 4, + 5, + &mut outstanding_repairs, ); - assert_eq!( - repairs, - [1, 7, 8, 3] - .iter() - .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) - .collect::>() + let expected_repairs = [1, 7, 8, 3, 5] + .iter() + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) + .collect::>(); + assert_eq!(repairs, expected_repairs); + assert_eq!(repairs.len(), outstanding_repairs.len()); + + // Ensure redundant repairs are not generated. + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut slot_meta_cache, + &mut repairs, + 1, + &mut outstanding_repairs, ); + assert_eq!(repairs, expected_repairs); + assert_eq!(repairs.len(), outstanding_repairs.len()); } #[test] @@ -338,6 +363,7 @@ pub mod test { sleep_shred_deferment_period(); let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut slot_meta_cache = HashMap::default(); get_best_repair_shreds( &heaviest_subtree_fork_choice, @@ -345,6 +371,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, usize::MAX, + &mut outstanding_repairs, ); let last_shred = blockstore.meta(0).unwrap().unwrap().received; assert_eq!( @@ -354,6 +381,7 @@ pub mod test { .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) .collect::>() ); + assert_eq!(repairs.len(), outstanding_repairs.len()); } fn setup_forks() -> (Blockstore, HeaviestSubtreeForkChoice) {