Skip to content

Commit

Permalink
unthrottle repair
Browse files Browse the repository at this point in the history
  • Loading branch information
bw-solana committed Jan 16, 2025
1 parent f76c121 commit d9b8221
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 27 deletions.
26 changes: 23 additions & 3 deletions core/src/repair/repair_generic_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use {
repair::{repair_service::RepairService, serve_repair::ShredRepairType},
},
solana_ledger::{blockstore::Blockstore, blockstore_meta::SlotMeta},
solana_sdk::{clock::Slot, hash::Hash},
std::collections::{HashMap, HashSet},
solana_sdk::{clock::Slot, hash::Hash, timing::timestamp},
std::collections::{hash_map::Entry, HashMap, HashSet},
};

struct GenericTraversal<'a> {
Expand Down Expand Up @@ -47,6 +47,7 @@ pub fn get_unknown_last_index(
slot_meta_cache: &mut HashMap<Slot, Option<SlotMeta>>,
processed_slots: &mut HashSet<Slot>,
limit: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let iter = GenericTraversal::new(tree);
let mut unknown_last = Vec::new();
Expand Down Expand Up @@ -74,8 +75,16 @@ pub fn get_unknown_last_index(
unknown_last.sort_by(|(_, _, count1), (_, _, count2)| count2.cmp(count1));
unknown_last
.iter()
.filter_map(|(slot, received, _)| {
let repair_request = ShredRepairType::HighestShred(*slot, *received);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
Some(repair_request)
} else {
None
}
})
.take(limit)
.map(|(slot, received, _)| ShredRepairType::HighestShred(*slot, *received))
.collect()
}

Expand Down Expand Up @@ -115,6 +124,7 @@ pub fn get_closest_completion(
slot_meta_cache: &mut HashMap<Slot, Option<SlotMeta>>,
processed_slots: &mut HashSet<Slot>,
limit: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> (Vec<ShredRepairType>, /* processed slots */ usize) {
let mut slot_dists: Vec<(Slot, u64)> = Vec::default();
let iter = GenericTraversal::new(tree);
Expand Down Expand Up @@ -192,6 +202,7 @@ pub fn get_closest_completion(
path_slot,
slot_meta,
limit - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
total_processed_slots += 1;
Expand All @@ -217,12 +228,14 @@ pub mod test {
let last_shred = blockstore.meta(0).unwrap().unwrap().received;
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
let mut outstanding_requests = HashMap::new();
let repairs = get_unknown_last_index(
&heaviest_subtree_fork_choice,
&blockstore,
&mut slot_meta_cache,
&mut processed_slots,
10,
&mut outstanding_requests,
);
assert_eq!(
repairs,
Expand All @@ -231,22 +244,26 @@ pub mod test {
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
assert_eq!(outstanding_requests.len(), repairs.len());
}

#[test]
fn test_get_closest_completion() {
let (blockstore, heaviest_subtree_fork_choice) = setup_forks();
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
let mut outstanding_requests = HashMap::new();
let (repairs, _) = get_closest_completion(
&heaviest_subtree_fork_choice,
&blockstore,
0, // root_slot
&mut slot_meta_cache,
&mut processed_slots,
10,
&mut outstanding_requests,
);
assert_eq!(repairs, []);
assert_eq!(outstanding_requests.len(), repairs.len());

let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5))));
let ledger_path = get_tmp_ledger_path!();
Expand All @@ -262,6 +279,7 @@ pub mod test {
let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks);
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
outstanding_requests = HashMap::new();
sleep_shred_deferment_period();
let (repairs, _) = get_closest_completion(
&heaviest_subtree_fork_choice,
Expand All @@ -270,8 +288,10 @@ pub mod test {
&mut slot_meta_cache,
&mut processed_slots,
1,
&mut outstanding_requests,
);
assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]);
assert_eq!(outstanding_requests.len(), repairs.len());
}

fn add_tree_with_missing_shreds(
Expand Down
72 changes: 66 additions & 6 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use {
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
std::{
collections::{HashMap, HashSet},
collections::{hash_map::Entry, HashMap, HashSet},
iter::Iterator,
net::{SocketAddr, UdpSocket},
sync::{
Expand All @@ -59,6 +59,11 @@ use {
const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;

// This is the amount of time we will wait for a repair request to be fulfilled
// before making another request. Value is based on reasonable upper bound of
// expected network delays in requesting repairs and receiving shreds.
const REPAIR_REQUEST_TIMEOUT_MS: u64 = 100;

// When requesting repair for a specific shred through the admin RPC, we will
// request up to NUM_PEERS_TO_SAMPLE_FOR_REPAIRS in the event a specific, valid
// target node is not provided. This number was chosen to provide reasonable
Expand Down Expand Up @@ -208,7 +213,7 @@ impl BestRepairsStats {
pub const MAX_REPAIR_LENGTH: usize = 512;
pub const MAX_REPAIR_PER_DUPLICATE: usize = 20;
pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000;
pub const REPAIR_MS: u64 = 100;
pub const REPAIR_MS: u64 = 1;
pub const MAX_ORPHANS: usize = 5;
pub const MAX_UNKNOWN_LAST_INDEX_REPAIRS: usize = 10;
pub const MAX_CLOSEST_COMPLETION_REPAIRS: usize = 100;
Expand Down Expand Up @@ -328,6 +333,8 @@ impl RepairService {
let mut last_stats = Instant::now();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
let mut popular_pruned_forks_requests = HashSet::new();
// Maps a repair that may still be outstanding to the timestamp it was requested.
let mut outstanding_repairs = HashMap::new();

while !exit.load(Ordering::Relaxed) {
let mut set_root_elapsed;
Expand Down Expand Up @@ -399,11 +406,17 @@ impl RepairService {
);
add_votes_elapsed.stop();

// Purge old entries. They've either completed or need to be retried.
outstanding_repairs.retain(|_repair_request, time| {
timestamp().saturating_sub(*time) < REPAIR_REQUEST_TIMEOUT_MS
});

let repairs = match repair_info.wen_restart_repair_slots.clone() {
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart(
blockstore,
MAX_REPAIR_LENGTH,
&slots_to_repair.read().unwrap(),
&mut outstanding_repairs,
),
None => repair_weight.get_best_weighted_repairs(
blockstore,
Expand All @@ -415,6 +428,7 @@ impl RepairService {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
&mut outstanding_repairs,
),
};

Expand Down Expand Up @@ -631,17 +645,33 @@ impl RepairService {
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true)
Self::generate_repairs_for_slot(
blockstore,
slot,
slot_meta,
max_repairs,
true,
outstanding_repairs,
)
}

pub fn generate_repairs_for_slot_not_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false)
Self::generate_repairs_for_slot(
blockstore,
slot,
slot_meta,
max_repairs,
false,
outstanding_repairs,
)
}

/// If this slot is missing shreds generate repairs
Expand All @@ -651,6 +681,7 @@ impl RepairService {
slot_meta: &SlotMeta,
max_repairs: usize,
throttle_requests_by_shred_tick: bool,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick {
DEFER_REPAIR_THRESHOLD_TICKS
Expand Down Expand Up @@ -680,7 +711,14 @@ impl RepairService {
}
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]

let repair_request = ShredRepairType::HighestShred(slot, slot_meta.received);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
vec![repair_request]
} else {
vec![]
}
} else {
blockstore
.find_missing_data_indexes(
Expand All @@ -692,7 +730,15 @@ impl RepairService {
max_repairs,
)
.into_iter()
.map(|i| ShredRepairType::Shred(slot, i))
.filter_map(|i| {
let repair_request = ShredRepairType::Shred(slot, i);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
Some(repair_request)
} else {
None
}
})
.collect()
}
}
Expand All @@ -703,6 +749,7 @@ impl RepairService {
repairs: &mut Vec<ShredRepairType>,
max_repairs: usize,
slot: Slot,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
Expand All @@ -713,6 +760,7 @@ impl RepairService {
slot,
&slot_meta,
max_repairs - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
let next_slots = slot_meta.next_slots;
Expand All @@ -727,6 +775,7 @@ impl RepairService {
blockstore: &Blockstore,
max_repairs: usize,
slots: &Vec<Slot>,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let mut repairs: Vec<ShredRepairType> = Vec::new();
for slot in slots {
Expand All @@ -738,6 +787,7 @@ impl RepairService {
*slot,
&slot_meta,
max_repairs - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
} else {
Expand Down Expand Up @@ -911,6 +961,7 @@ impl RepairService {
slot,
&meta,
max_repairs - repairs.len(),
&mut HashMap::default(),
);
repairs.extend(new_repairs);
}
Expand All @@ -933,6 +984,7 @@ impl RepairService {
slot,
&slot_meta,
MAX_REPAIR_PER_DUPLICATE,
&mut HashMap::default(),
))
}
} else {
Expand Down Expand Up @@ -1163,6 +1215,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
vec![
ShredRepairType::Orphan(2),
Expand Down Expand Up @@ -1195,6 +1248,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
vec![ShredRepairType::HighestShred(0, 0)]
);
Expand Down Expand Up @@ -1252,6 +1306,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
expected
);
Expand All @@ -1267,6 +1322,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
)[..],
expected[0..expected.len() - 2]
);
Expand Down Expand Up @@ -1310,6 +1366,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
expected
);
Expand Down Expand Up @@ -1627,6 +1684,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert!(result.is_empty());

Expand All @@ -1636,6 +1694,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert_eq!(
result,
Expand All @@ -1651,6 +1710,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert_eq!(result.len(), max_repairs);
assert_eq!(
Expand Down
Loading

0 comments on commit d9b8221

Please sign in to comment.