diff --git a/core/src/repair/repair_generic_traversal.rs b/core/src/repair/repair_generic_traversal.rs index b4b330e3d99bac..73bf662c142913 100644 --- a/core/src/repair/repair_generic_traversal.rs +++ b/core/src/repair/repair_generic_traversal.rs @@ -47,6 +47,7 @@ pub fn get_unknown_last_index( slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, limit: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { let iter = GenericTraversal::new(tree); let mut unknown_last = Vec::new(); @@ -76,6 +77,14 @@ pub fn get_unknown_last_index( .iter() .take(limit) .map(|(slot, received, _)| ShredRepairType::HighestShred(*slot, *received)) + .filter(|repair_request| { + if !outstanding_repairs.contains_key(repair_request) { + outstanding_repairs.insert(*repair_request, 0); + true + } else { + false + } + }) .collect() } @@ -115,6 +124,7 @@ pub fn get_closest_completion( slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, limit: usize, + outstanding_repairs: &mut HashMap, ) -> (Vec, /* processed slots */ usize) { let mut slot_dists: Vec<(Slot, u64)> = Vec::default(); let iter = GenericTraversal::new(tree); @@ -192,6 +202,7 @@ pub fn get_closest_completion( path_slot, slot_meta, limit - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); total_processed_slots += 1; @@ -223,6 +234,7 @@ pub mod test { &mut slot_meta_cache, &mut processed_slots, 10, + &mut HashMap::default(), ); assert_eq!( repairs, @@ -245,6 +257,7 @@ pub mod test { &mut slot_meta_cache, &mut processed_slots, 10, + &mut HashMap::default(), ); assert_eq!(repairs, []); @@ -270,6 +283,7 @@ pub mod test { &mut slot_meta_cache, &mut processed_slots, 1, + &mut HashMap::default(), ); assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]); } diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 0925601b93e9de..a5cba3777a04e1 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -42,7 +42,7 @@ use { }, solana_streamer::sendmmsg::{batch_send, SendPktsError}, std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, iter::Iterator, net::{SocketAddr, UdpSocket}, sync::{ @@ -59,6 +59,10 @@ use { const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200); const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK; +// This is the amount of time we will wait for a repair request to be fulfilled +// before making another request. +const REPAIR_REQUEST_TIMEOUT_MS: u64 = 100; + // When requesting repair for a specific shred through the admin RPC, we will // request up to NUM_PEERS_TO_SAMPLE_FOR_REPAIRS in the event a specific, valid // target node is not provided. This number was chosen to provide reasonable @@ -208,7 +212,7 @@ impl BestRepairsStats { pub const MAX_REPAIR_LENGTH: usize = 512; pub const MAX_REPAIR_PER_DUPLICATE: usize = 20; pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000; -pub const REPAIR_MS: u64 = 100; +pub const REPAIR_MS: u64 = 1; pub const MAX_ORPHANS: usize = 5; pub const MAX_UNKNOWN_LAST_INDEX_REPAIRS: usize = 10; pub const MAX_CLOSEST_COMPLETION_REPAIRS: usize = 100; @@ -328,6 +332,7 @@ impl RepairService { let mut last_stats = Instant::now(); let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY); let mut popular_pruned_forks_requests = HashSet::new(); + let mut outstanding_repairs = HashMap::new(); while !exit.load(Ordering::Relaxed) { let mut set_root_elapsed; @@ -399,11 +404,17 @@ impl RepairService { ); add_votes_elapsed.stop(); + // Purge old entries. They've either completed or need to be retried. + outstanding_repairs.retain(|_repair_request, time| { + timestamp().saturating_sub(*time) > REPAIR_REQUEST_TIMEOUT_MS + }); + let repairs = match repair_info.wen_restart_repair_slots.clone() { Some(slots_to_repair) => Self::generate_repairs_for_wen_restart( blockstore, MAX_REPAIR_LENGTH, &slots_to_repair.read().unwrap(), + &mut outstanding_repairs, ), None => repair_weight.get_best_weighted_repairs( blockstore, @@ -415,9 +426,23 @@ impl RepairService { MAX_CLOSEST_COMPLETION_REPAIRS, &mut repair_timing, &mut best_repairs_stats, + &mut outstanding_repairs, ), }; + // Filter out repair requests that are duplicates and haven't timed out. + let repairs = repairs + .into_iter() + .filter(|repair_request| { + if !outstanding_repairs.contains_key(repair_request) { + outstanding_repairs.insert(*repair_request, timestamp()); + true + } else { + false + } + }) + .collect::>(); + let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks( root_bank.epoch_stakes_map(), root_bank.epoch_schedule(), @@ -631,8 +656,16 @@ impl RepairService { slot: Slot, slot_meta: &SlotMeta, max_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { - Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true) + Self::generate_repairs_for_slot( + blockstore, + slot, + slot_meta, + max_repairs, + true, + outstanding_repairs, + ) } pub fn generate_repairs_for_slot_not_throttled_by_tick( @@ -640,8 +673,16 @@ impl RepairService { slot: Slot, slot_meta: &SlotMeta, max_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { - Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false) + Self::generate_repairs_for_slot( + blockstore, + slot, + slot_meta, + max_repairs, + false, + outstanding_repairs, + ) } /// If this slot is missing shreds generate repairs @@ -651,6 +692,7 @@ impl RepairService { slot_meta: &SlotMeta, max_repairs: usize, throttle_requests_by_shred_tick: bool, + outstanding_repairs: &mut HashMap, ) -> Vec { let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick { DEFER_REPAIR_THRESHOLD_TICKS @@ -680,7 +722,13 @@ impl RepairService { } } } - vec![ShredRepairType::HighestShred(slot, slot_meta.received)] + let repair_request = ShredRepairType::HighestShred(slot, slot_meta.received); + if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) { + entry.insert(timestamp()); + vec![repair_request] + } else { + vec![] + } } else { blockstore .find_missing_data_indexes( @@ -693,6 +741,14 @@ impl RepairService { ) .into_iter() .map(|i| ShredRepairType::Shred(slot, i)) + .filter(|repair_request| { + if !outstanding_repairs.contains_key(repair_request) { + outstanding_repairs.insert(*repair_request, timestamp()); + true + } else { + false + } + }) .collect() } } @@ -703,6 +759,7 @@ impl RepairService { repairs: &mut Vec, max_repairs: usize, slot: Slot, + outstanding_repairs: &mut HashMap, ) { let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { @@ -713,6 +770,7 @@ impl RepairService { slot, &slot_meta, max_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); let next_slots = slot_meta.next_slots; @@ -727,6 +785,7 @@ impl RepairService { blockstore: &Blockstore, max_repairs: usize, slots: &Vec, + outstanding_repairs: &mut HashMap, ) -> Vec { let mut repairs: Vec = Vec::new(); for slot in slots { @@ -738,6 +797,7 @@ impl RepairService { *slot, &slot_meta, max_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); } else { @@ -911,6 +971,7 @@ impl RepairService { slot, &meta, max_repairs - repairs.len(), + &mut HashMap::default(), ); repairs.extend(new_repairs); } @@ -933,6 +994,7 @@ impl RepairService { slot, &slot_meta, MAX_REPAIR_PER_DUPLICATE, + &mut HashMap::default(), )) } } else { @@ -1163,6 +1225,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::new(), ), vec![ ShredRepairType::Orphan(2), @@ -1195,6 +1258,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::new(), ), vec![ShredRepairType::HighestShred(0, 0)] ); @@ -1252,6 +1316,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::new(), ), expected ); @@ -1267,6 +1332,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::new(), )[..], expected[0..expected.len() - 2] ); @@ -1310,6 +1376,7 @@ mod test { MAX_CLOSEST_COMPLETION_REPAIRS, &mut RepairTiming::default(), &mut BestRepairsStats::default(), + &mut HashMap::new(), ), expected ); @@ -1627,6 +1694,7 @@ mod test { &blockstore, max_repairs, &slots_to_repair, + &mut HashMap::default(), ); assert!(result.is_empty()); @@ -1636,6 +1704,7 @@ mod test { &blockstore, max_repairs, &slots_to_repair, + &mut HashMap::default(), ); assert_eq!( result, @@ -1651,6 +1720,7 @@ mod test { &blockstore, max_repairs, &slots_to_repair, + &mut HashMap::default(), ); assert_eq!(result.len(), max_repairs); assert_eq!( diff --git a/core/src/repair/repair_weight.rs b/core/src/repair/repair_weight.rs index cb15c2de27d7e3..6990040c87cc9a 100644 --- a/core/src/repair/repair_weight.rs +++ b/core/src/repair/repair_weight.rs @@ -19,9 +19,10 @@ use { epoch_schedule::{Epoch, EpochSchedule}, hash::Hash, pubkey::Pubkey, + timing::timestamp, }, std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, iter, }, }; @@ -214,6 +215,7 @@ impl RepairWeight { max_closest_completion_repairs: usize, repair_timing: &mut RepairTiming, stats: &mut BestRepairsStats, + outstanding_repairs: &mut HashMap, ) -> Vec { let mut repairs = vec![]; let mut processed_slots = HashSet::from([self.root]); @@ -228,6 +230,7 @@ impl RepairWeight { epoch_stakes, epoch_schedule, max_new_orphans, + outstanding_repairs, ); // Subtract 1 because the root is not processed as an orphan let num_orphan_slots = processed_slots.len() - 1; @@ -242,6 +245,7 @@ impl RepairWeight { &mut slot_meta_cache, &mut best_shreds_repairs, max_new_shreds, + outstanding_repairs, ); let num_best_shreds_repairs = best_shreds_repairs.len(); let repair_slots_set: HashSet = @@ -263,6 +267,7 @@ impl RepairWeight { &mut slot_meta_cache, &mut processed_slots, max_unknown_last_index_repairs, + outstanding_repairs, ); let num_unknown_last_index_repairs = unknown_last_index_repairs.len(); let num_unknown_last_index_slots = processed_slots.len() - pre_num_slots; @@ -276,6 +281,7 @@ impl RepairWeight { &mut slot_meta_cache, &mut processed_slots, max_closest_completion_repairs, + outstanding_repairs, ); let num_closest_completion_repairs = closest_completion_repairs.len(); let num_closest_completion_slots = processed_slots.len() - pre_num_slots; @@ -506,6 +512,7 @@ impl RepairWeight { slot_meta_cache: &mut HashMap>, repairs: &mut Vec, max_new_shreds: usize, + outstanding_repairs: &mut HashMap, ) { let root_tree = self.trees.get(&self.root).expect("Root tree must exist"); repair_weighted_traversal::get_best_repair_shreds( @@ -514,6 +521,7 @@ impl RepairWeight { slot_meta_cache, repairs, max_new_shreds, + outstanding_repairs, ); } @@ -525,6 +533,7 @@ impl RepairWeight { epoch_stakes: &HashMap, epoch_schedule: &EpochSchedule, max_new_orphans: usize, + outstanding_repairs: &mut HashMap, ) { // Sort each tree in `self.trees`, by the amount of stake that has voted on each, // tiebreaker going to earlier slots, thus prioritizing earlier slots on the same fork @@ -543,9 +552,9 @@ impl RepairWeight { // Heavier, smaller slots come first Self::sort_by_stake_weight_slot(&mut stake_weighted_trees); - let mut best_orphans: HashSet = HashSet::new(); + let mut new_best_orphan_requests = 0; for (heaviest_tree_root, _) in stake_weighted_trees { - if best_orphans.len() >= max_new_orphans { + if new_best_orphan_requests >= max_new_orphans { break; } if processed_slots.contains(&heaviest_tree_root) { @@ -560,28 +569,32 @@ impl RepairWeight { epoch_schedule, ); if let Some(new_orphan_root) = new_orphan_root { - if new_orphan_root != self.root && !best_orphans.contains(&new_orphan_root) { - best_orphans.insert(new_orphan_root); - repairs.push(ShredRepairType::Orphan(new_orphan_root)); - processed_slots.insert(new_orphan_root); + if new_orphan_root != self.root { + let repair_request = ShredRepairType::Orphan(new_orphan_root); + if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) { + entry.insert(timestamp()); + repairs.push(repair_request); + processed_slots.insert(new_orphan_root); + new_best_orphan_requests += 1; + } } } } } // If there are fewer than `max_new_orphans`, just grab the next - // available ones - if best_orphans.len() < max_new_orphans { - for new_orphan in blockstore.orphans_iterator(self.root + 1).unwrap() { - if !best_orphans.contains(&new_orphan) { - repairs.push(ShredRepairType::Orphan(new_orphan)); - best_orphans.insert(new_orphan); - processed_slots.insert(new_orphan); - } + // available ones. + for new_orphan in blockstore.orphans_iterator(self.root + 1).unwrap() { + if new_best_orphan_requests >= max_new_orphans { + break; + } - if best_orphans.len() == max_new_orphans { - break; - } + let repair_request = ShredRepairType::Orphan(new_orphan); + if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) { + entry.insert(timestamp()); + repairs.push(repair_request); + processed_slots.insert(new_orphan); + new_best_orphan_requests += 1; } } } @@ -594,6 +607,7 @@ impl RepairWeight { slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, max_new_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> Vec { let mut repairs = Vec::default(); for (_slot, tree) in self.trees.iter() { @@ -606,6 +620,7 @@ impl RepairWeight { slot_meta_cache, processed_slots, max_new_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); } @@ -622,6 +637,7 @@ impl RepairWeight { slot_meta_cache: &mut HashMap>, processed_slots: &mut HashSet, max_new_repairs: usize, + outstanding_repairs: &mut HashMap, ) -> (Vec, /* processed slots */ usize) { let mut repairs = Vec::default(); let mut total_processed_slots = 0; @@ -636,6 +652,7 @@ impl RepairWeight { slot_meta_cache, processed_slots, max_new_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); total_processed_slots += new_processed_slots; @@ -1486,6 +1503,7 @@ mod test { // Ask for only 1 orphan. Because the orphans have the same weight, // should prioritize smaller orphan first let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots: HashSet = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -1494,6 +1512,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert_eq!( repair_weight @@ -1515,6 +1534,7 @@ mod test { // New vote on same orphan branch, without any new slot chaining // information blockstore should not resolve the orphan repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); let votes = vec![(10, vec![vote_pubkeys[0]])]; repair_weight.add_votes( @@ -1530,12 +1550,14 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 1); assert_eq!(repairs[0].slot(), 8); // Ask for 2 orphans, should return all the orphans repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -1544,6 +1566,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 2, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 2); assert_eq!(repairs[0].slot(), 8); @@ -1551,6 +1574,7 @@ mod test { // If one orphan gets heavier, should pick that one repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); let votes = vec![(20, vec![vote_pubkeys[0]])]; repair_weight.add_votes( @@ -1566,6 +1590,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 1); assert_eq!(repairs[0].slot(), 20); @@ -1573,6 +1598,7 @@ mod test { // Resolve the orphans, should now return no // orphans repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); blockstore.add_tree(tr(6) / (tr(8)), true, true, 2, Hash::default()); blockstore.add_tree(tr(11) / (tr(20)), true, true, 2, Hash::default()); @@ -1583,6 +1609,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 1, + &mut outstanding_repairs, ); assert!(repairs.is_empty()); } @@ -1611,6 +1638,7 @@ mod test { // orphan in the `trees` map, we should search for // exactly one more of the remaining two let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots: HashSet = vec![repair_weight.root].into_iter().collect(); blockstore.add_tree(tr(100) / (tr(101)), true, true, 2, Hash::default()); repair_weight.get_best_orphans( @@ -1620,6 +1648,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 2, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 2); assert_eq!(repairs[0].slot(), 8); @@ -1627,6 +1656,7 @@ mod test { // If we ask for 3 orphans, we should get all of them let mut repairs = vec![]; + outstanding_repairs = HashMap::new(); processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -1635,6 +1665,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 3, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 3); assert_eq!(repairs[0].slot(), 8); @@ -2318,6 +2349,7 @@ mod test { // Get best orphans works as usual let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -2326,6 +2358,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 4, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 4); assert_eq!(repairs[0].slot(), 10); @@ -2362,6 +2395,7 @@ mod test { // Get best orphans works as usual let mut repairs = vec![]; + let mut outstanding_repairs = HashMap::new(); let mut processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -2370,6 +2404,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 4, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 3); assert_eq!(repairs[0].slot(), 6); @@ -2383,6 +2418,7 @@ mod test { // Verify orphans properly updated and chained let mut repairs = vec![]; + outstanding_repairs = HashMap::new(); let mut processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, @@ -2391,6 +2427,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), 4, + &mut outstanding_repairs, ); assert_eq!(repairs.len(), 1); assert_eq!(repairs[0].slot(), 3); diff --git a/core/src/repair/repair_weighted_traversal.rs b/core/src/repair/repair_weighted_traversal.rs index bcc4425ada111b..9caabca1480cfb 100644 --- a/core/src/repair/repair_weighted_traversal.rs +++ b/core/src/repair/repair_weighted_traversal.rs @@ -78,6 +78,7 @@ pub fn get_best_repair_shreds( slot_meta_cache: &mut HashMap>, repairs: &mut Vec, max_new_shreds: usize, + outstanding_repairs: &mut HashMap, ) { let initial_len = repairs.len(); let max_repairs = initial_len + max_new_shreds; @@ -103,6 +104,7 @@ pub fn get_best_repair_shreds( slot, slot_meta, max_repairs - repairs.len(), + outstanding_repairs, ); repairs.extend(new_repairs); visited_set.insert(slot); @@ -122,6 +124,7 @@ pub fn get_best_repair_shreds( repairs, max_repairs, *new_child_slot, + outstanding_repairs, ); } visited_set.insert(*new_child_slot); @@ -229,6 +232,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 6, + &mut HashMap::default(), ); assert_eq!( repairs, @@ -258,6 +262,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 6, + &mut HashMap::default(), ); assert_eq!( repairs, @@ -298,6 +303,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 4, + &mut HashMap::default(), ); assert_eq!( repairs, @@ -319,6 +325,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, 4, + &mut HashMap::default(), ); assert_eq!( repairs, @@ -345,6 +352,7 @@ pub mod test { &mut slot_meta_cache, &mut repairs, usize::MAX, + &mut HashMap::default(), ); let last_shred = blockstore.meta(0).unwrap().unwrap().received; assert_eq!(