Skip to content

Commit

Permalink
unthrottle repair
Browse files Browse the repository at this point in the history
  • Loading branch information
bw-solana committed Jan 15, 2025
1 parent 9f7f34b commit be61c3a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 23 deletions.
14 changes: 14 additions & 0 deletions core/src/repair/repair_generic_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub fn get_unknown_last_index(
slot_meta_cache: &mut HashMap<Slot, Option<SlotMeta>>,
processed_slots: &mut HashSet<Slot>,
limit: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let iter = GenericTraversal::new(tree);
let mut unknown_last = Vec::new();
Expand Down Expand Up @@ -76,6 +77,14 @@ pub fn get_unknown_last_index(
.iter()
.take(limit)
.map(|(slot, received, _)| ShredRepairType::HighestShred(*slot, *received))
.filter(|repair_request| {
if !outstanding_repairs.contains_key(repair_request) {
outstanding_repairs.insert(*repair_request, 0);
true
} else {
false
}
})
.collect()
}

Expand Down Expand Up @@ -115,6 +124,7 @@ pub fn get_closest_completion(
slot_meta_cache: &mut HashMap<Slot, Option<SlotMeta>>,
processed_slots: &mut HashSet<Slot>,
limit: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> (Vec<ShredRepairType>, /* processed slots */ usize) {
let mut slot_dists: Vec<(Slot, u64)> = Vec::default();
let iter = GenericTraversal::new(tree);
Expand Down Expand Up @@ -192,6 +202,7 @@ pub fn get_closest_completion(
path_slot,
slot_meta,
limit - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
total_processed_slots += 1;
Expand Down Expand Up @@ -223,6 +234,7 @@ pub mod test {
&mut slot_meta_cache,
&mut processed_slots,
10,
&mut HashMap::default(),
);
assert_eq!(
repairs,
Expand All @@ -245,6 +257,7 @@ pub mod test {
&mut slot_meta_cache,
&mut processed_slots,
10,
&mut HashMap::default(),
);
assert_eq!(repairs, []);

Expand All @@ -270,6 +283,7 @@ pub mod test {
&mut slot_meta_cache,
&mut processed_slots,
1,
&mut HashMap::default(),
);
assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]);
}
Expand Down
80 changes: 75 additions & 5 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use {
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
std::{
collections::{HashMap, HashSet},
collections::{hash_map::Entry, HashMap, HashSet},
iter::Iterator,
net::{SocketAddr, UdpSocket},
sync::{
Expand All @@ -59,6 +59,10 @@ use {
const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;

// This is the amount of time we will wait for a repair request to be fulfilled
// before making another request.
const REPAIR_REQUEST_TIMEOUT_MS: u64 = 100;

// When requesting repair for a specific shred through the admin RPC, we will
// request up to NUM_PEERS_TO_SAMPLE_FOR_REPAIRS in the event a specific, valid
// target node is not provided. This number was chosen to provide reasonable
Expand Down Expand Up @@ -208,7 +212,7 @@ impl BestRepairsStats {
pub const MAX_REPAIR_LENGTH: usize = 512;
pub const MAX_REPAIR_PER_DUPLICATE: usize = 20;
pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000;
pub const REPAIR_MS: u64 = 100;
pub const REPAIR_MS: u64 = 1;
pub const MAX_ORPHANS: usize = 5;
pub const MAX_UNKNOWN_LAST_INDEX_REPAIRS: usize = 10;
pub const MAX_CLOSEST_COMPLETION_REPAIRS: usize = 100;
Expand Down Expand Up @@ -328,6 +332,7 @@ impl RepairService {
let mut last_stats = Instant::now();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
let mut popular_pruned_forks_requests = HashSet::new();
let mut outstanding_repairs = HashMap::new();

while !exit.load(Ordering::Relaxed) {
let mut set_root_elapsed;
Expand Down Expand Up @@ -399,11 +404,17 @@ impl RepairService {
);
add_votes_elapsed.stop();

// Purge old entries. They've either completed or need to be retried.
outstanding_repairs.retain(|_repair_request, time| {
timestamp().saturating_sub(*time) > REPAIR_REQUEST_TIMEOUT_MS
});

let repairs = match repair_info.wen_restart_repair_slots.clone() {
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart(
blockstore,
MAX_REPAIR_LENGTH,
&slots_to_repair.read().unwrap(),
&mut outstanding_repairs,
),
None => repair_weight.get_best_weighted_repairs(
blockstore,
Expand All @@ -415,9 +426,23 @@ impl RepairService {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
&mut outstanding_repairs,
),
};

// Filter out repair requests that are duplicates and haven't timed out.
let repairs = repairs
.into_iter()
.filter(|repair_request| {
if !outstanding_repairs.contains_key(repair_request) {
outstanding_repairs.insert(*repair_request, timestamp());
true
} else {
false
}
})
.collect::<Vec<_>>();

let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks(
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
Expand Down Expand Up @@ -631,17 +656,33 @@ impl RepairService {
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true)
Self::generate_repairs_for_slot(
blockstore,
slot,
slot_meta,
max_repairs,
true,
outstanding_repairs,
)
}

pub fn generate_repairs_for_slot_not_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false)
Self::generate_repairs_for_slot(
blockstore,
slot,
slot_meta,
max_repairs,
false,
outstanding_repairs,
)
}

/// If this slot is missing shreds generate repairs
Expand All @@ -651,6 +692,7 @@ impl RepairService {
slot_meta: &SlotMeta,
max_repairs: usize,
throttle_requests_by_shred_tick: bool,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick {
DEFER_REPAIR_THRESHOLD_TICKS
Expand Down Expand Up @@ -680,7 +722,13 @@ impl RepairService {
}
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]
let repair_request = ShredRepairType::HighestShred(slot, slot_meta.received);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
vec![repair_request]
} else {
vec![]
}
} else {
blockstore
.find_missing_data_indexes(
Expand All @@ -693,6 +741,14 @@ impl RepairService {
)
.into_iter()
.map(|i| ShredRepairType::Shred(slot, i))
.filter(|repair_request| {
if !outstanding_repairs.contains_key(repair_request) {
outstanding_repairs.insert(*repair_request, timestamp());
true
} else {
false
}
})
.collect()
}
}
Expand All @@ -703,6 +759,7 @@ impl RepairService {
repairs: &mut Vec<ShredRepairType>,
max_repairs: usize,
slot: Slot,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
Expand All @@ -713,6 +770,7 @@ impl RepairService {
slot,
&slot_meta,
max_repairs - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
let next_slots = slot_meta.next_slots;
Expand All @@ -727,6 +785,7 @@ impl RepairService {
blockstore: &Blockstore,
max_repairs: usize,
slots: &Vec<Slot>,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let mut repairs: Vec<ShredRepairType> = Vec::new();
for slot in slots {
Expand All @@ -738,6 +797,7 @@ impl RepairService {
*slot,
&slot_meta,
max_repairs - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
} else {
Expand Down Expand Up @@ -911,6 +971,7 @@ impl RepairService {
slot,
&meta,
max_repairs - repairs.len(),
&mut HashMap::default(),
);
repairs.extend(new_repairs);
}
Expand All @@ -933,6 +994,7 @@ impl RepairService {
slot,
&slot_meta,
MAX_REPAIR_PER_DUPLICATE,
&mut HashMap::default(),
))
}
} else {
Expand Down Expand Up @@ -1163,6 +1225,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::new(),
),
vec![
ShredRepairType::Orphan(2),
Expand Down Expand Up @@ -1195,6 +1258,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::new(),
),
vec![ShredRepairType::HighestShred(0, 0)]
);
Expand Down Expand Up @@ -1252,6 +1316,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::new(),
),
expected
);
Expand All @@ -1267,6 +1332,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::new(),
)[..],
expected[0..expected.len() - 2]
);
Expand Down Expand Up @@ -1310,6 +1376,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::new(),
),
expected
);
Expand Down Expand Up @@ -1627,6 +1694,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert!(result.is_empty());

Expand All @@ -1636,6 +1704,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert_eq!(
result,
Expand All @@ -1651,6 +1720,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert_eq!(result.len(), max_repairs);
assert_eq!(
Expand Down
Loading

0 comments on commit be61c3a

Please sign in to comment.