Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unthrottle repair requests #4485

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions core/src/repair/repair_generic_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use {
repair::{repair_service::RepairService, serve_repair::ShredRepairType},
},
solana_ledger::{blockstore::Blockstore, blockstore_meta::SlotMeta},
solana_sdk::{clock::Slot, hash::Hash},
std::collections::{HashMap, HashSet},
solana_sdk::{clock::Slot, hash::Hash, timing::timestamp},
std::collections::{hash_map::Entry, HashMap, HashSet},
};

struct GenericTraversal<'a> {
Expand Down Expand Up @@ -47,6 +47,7 @@ pub fn get_unknown_last_index(
slot_meta_cache: &mut HashMap<Slot, Option<SlotMeta>>,
processed_slots: &mut HashSet<Slot>,
limit: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let iter = GenericTraversal::new(tree);
let mut unknown_last = Vec::new();
Expand Down Expand Up @@ -74,8 +75,16 @@ pub fn get_unknown_last_index(
unknown_last.sort_by(|(_, _, count1), (_, _, count2)| count2.cmp(count1));
unknown_last
.iter()
.filter_map(|(slot, received, _)| {
let repair_request = ShredRepairType::HighestShred(*slot, *received);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
Some(repair_request)
} else {
None
}
})
.take(limit)
.map(|(slot, received, _)| ShredRepairType::HighestShred(*slot, *received))
.collect()
}

Expand Down Expand Up @@ -115,6 +124,7 @@ pub fn get_closest_completion(
slot_meta_cache: &mut HashMap<Slot, Option<SlotMeta>>,
processed_slots: &mut HashSet<Slot>,
limit: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> (Vec<ShredRepairType>, /* processed slots */ usize) {
let mut slot_dists: Vec<(Slot, u64)> = Vec::default();
let iter = GenericTraversal::new(tree);
Expand Down Expand Up @@ -192,6 +202,7 @@ pub fn get_closest_completion(
path_slot,
slot_meta,
limit - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
total_processed_slots += 1;
Expand All @@ -217,12 +228,14 @@ pub mod test {
let last_shred = blockstore.meta(0).unwrap().unwrap().received;
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
let mut outstanding_requests = HashMap::new();
let repairs = get_unknown_last_index(
&heaviest_subtree_fork_choice,
&blockstore,
&mut slot_meta_cache,
&mut processed_slots,
10,
&mut outstanding_requests,
);
assert_eq!(
repairs,
Expand All @@ -231,22 +244,26 @@ pub mod test {
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
assert_eq!(outstanding_requests.len(), repairs.len());
}

#[test]
fn test_get_closest_completion() {
let (blockstore, heaviest_subtree_fork_choice) = setup_forks();
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
let mut outstanding_requests = HashMap::new();
let (repairs, _) = get_closest_completion(
&heaviest_subtree_fork_choice,
&blockstore,
0, // root_slot
&mut slot_meta_cache,
&mut processed_slots,
10,
&mut outstanding_requests,
);
assert_eq!(repairs, []);
assert_eq!(outstanding_requests.len(), repairs.len());

let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5))));
let ledger_path = get_tmp_ledger_path!();
Expand All @@ -262,6 +279,7 @@ pub mod test {
let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks);
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
outstanding_requests = HashMap::new();
sleep_shred_deferment_period();
let (repairs, _) = get_closest_completion(
&heaviest_subtree_fork_choice,
Expand All @@ -270,8 +288,10 @@ pub mod test {
&mut slot_meta_cache,
&mut processed_slots,
1,
&mut outstanding_requests,
);
assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]);
assert_eq!(outstanding_requests.len(), repairs.len());
}

fn add_tree_with_missing_shreds(
Expand Down
72 changes: 66 additions & 6 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use {
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
std::{
collections::{HashMap, HashSet},
collections::{hash_map::Entry, HashMap, HashSet},
iter::Iterator,
net::{SocketAddr, UdpSocket},
sync::{
Expand All @@ -59,6 +59,11 @@ use {
const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;

// This is the amount of time we will wait for a repair request to be fulfilled
// before making another request. Value is based on reasonable upper bound of
// expected network delays in requesting repairs and receiving shreds.
const REPAIR_REQUEST_TIMEOUT_MS: u64 = 100;

// When requesting repair for a specific shred through the admin RPC, we will
// request up to NUM_PEERS_TO_SAMPLE_FOR_REPAIRS in the event a specific, valid
// target node is not provided. This number was chosen to provide reasonable
Expand Down Expand Up @@ -208,7 +213,7 @@ impl BestRepairsStats {
pub const MAX_REPAIR_LENGTH: usize = 512;
pub const MAX_REPAIR_PER_DUPLICATE: usize = 20;
pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000;
pub const REPAIR_MS: u64 = 100;
pub const REPAIR_MS: u64 = 1;
pub const MAX_ORPHANS: usize = 5;
pub const MAX_UNKNOWN_LAST_INDEX_REPAIRS: usize = 10;
pub const MAX_CLOSEST_COMPLETION_REPAIRS: usize = 100;
Expand Down Expand Up @@ -328,6 +333,8 @@ impl RepairService {
let mut last_stats = Instant::now();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
let mut popular_pruned_forks_requests = HashSet::new();
// Maps a repair that may still be outstanding to the timestamp it was requested.
let mut outstanding_repairs = HashMap::new();

while !exit.load(Ordering::Relaxed) {
let mut set_root_elapsed;
Expand Down Expand Up @@ -399,11 +406,17 @@ impl RepairService {
);
add_votes_elapsed.stop();

// Purge old entries. They've either completed or need to be retried.
outstanding_repairs.retain(|_repair_request, time| {
timestamp().saturating_sub(*time) < REPAIR_REQUEST_TIMEOUT_MS
});

let repairs = match repair_info.wen_restart_repair_slots.clone() {
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart(
blockstore,
MAX_REPAIR_LENGTH,
&slots_to_repair.read().unwrap(),
&mut outstanding_repairs,
),
None => repair_weight.get_best_weighted_repairs(
blockstore,
Expand All @@ -415,6 +428,7 @@ impl RepairService {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
&mut outstanding_repairs,
),
};

Expand Down Expand Up @@ -631,17 +645,33 @@ impl RepairService {
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true)
Self::generate_repairs_for_slot(
blockstore,
slot,
slot_meta,
max_repairs,
true,
outstanding_repairs,
)
}

pub fn generate_repairs_for_slot_not_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false)
Self::generate_repairs_for_slot(
blockstore,
slot,
slot_meta,
max_repairs,
false,
outstanding_repairs,
)
}

/// If this slot is missing shreds generate repairs
Expand All @@ -651,6 +681,7 @@ impl RepairService {
slot_meta: &SlotMeta,
max_repairs: usize,
throttle_requests_by_shred_tick: bool,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick {
DEFER_REPAIR_THRESHOLD_TICKS
Expand Down Expand Up @@ -680,7 +711,14 @@ impl RepairService {
}
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]

let repair_request = ShredRepairType::HighestShred(slot, slot_meta.received);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
vec![repair_request]
} else {
vec![]
}
} else {
blockstore
.find_missing_data_indexes(
Expand All @@ -692,7 +730,15 @@ impl RepairService {
max_repairs,
)
.into_iter()
.map(|i| ShredRepairType::Shred(slot, i))
.filter_map(|i| {
let repair_request = ShredRepairType::Shred(slot, i);
if let Entry::Vacant(entry) = outstanding_repairs.entry(repair_request) {
entry.insert(timestamp());
Some(repair_request)
} else {
None
}
})
.collect()
}
}
Expand All @@ -703,6 +749,7 @@ impl RepairService {
repairs: &mut Vec<ShredRepairType>,
max_repairs: usize,
slot: Slot,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
Expand All @@ -713,6 +760,7 @@ impl RepairService {
slot,
&slot_meta,
max_repairs - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
let next_slots = slot_meta.next_slots;
Expand All @@ -727,6 +775,7 @@ impl RepairService {
blockstore: &Blockstore,
max_repairs: usize,
slots: &Vec<Slot>,
outstanding_repairs: &mut HashMap<ShredRepairType, u64>,
) -> Vec<ShredRepairType> {
let mut repairs: Vec<ShredRepairType> = Vec::new();
for slot in slots {
Expand All @@ -738,6 +787,7 @@ impl RepairService {
*slot,
&slot_meta,
max_repairs - repairs.len(),
outstanding_repairs,
);
repairs.extend(new_repairs);
} else {
Expand Down Expand Up @@ -911,6 +961,7 @@ impl RepairService {
slot,
&meta,
max_repairs - repairs.len(),
&mut HashMap::default(),
);
repairs.extend(new_repairs);
}
Expand All @@ -933,6 +984,7 @@ impl RepairService {
slot,
&slot_meta,
MAX_REPAIR_PER_DUPLICATE,
&mut HashMap::default(),
))
}
} else {
Expand Down Expand Up @@ -1163,6 +1215,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
vec![
ShredRepairType::Orphan(2),
Expand Down Expand Up @@ -1195,6 +1248,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
vec![ShredRepairType::HighestShred(0, 0)]
);
Expand Down Expand Up @@ -1252,6 +1306,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
expected
);
Expand All @@ -1267,6 +1322,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
)[..],
expected[0..expected.len() - 2]
);
Expand Down Expand Up @@ -1310,6 +1366,7 @@ mod test {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
&mut HashMap::default(),
),
expected
);
Expand Down Expand Up @@ -1627,6 +1684,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert!(result.is_empty());

Expand All @@ -1636,6 +1694,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert_eq!(
result,
Expand All @@ -1651,6 +1710,7 @@ mod test {
&blockstore,
max_repairs,
&slots_to_repair,
&mut HashMap::default(),
);
assert_eq!(result.len(), max_repairs);
assert_eq!(
Expand Down
Loading
Loading