From 1b28913613f237452047c4a0acb2129d91f9c856 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 15 Jan 2025 18:10:04 -0600 Subject: [PATCH] removes branch to recover legacy shreds Legacy shreds are discarded on all clusters: https://github.com/anza-xyz/agave/blob/91d0d0cae/ledger/src/shred.rs#L1275-L1277 Removing the branch to recover legacy shreds would allow to further simplify and optimize shreds recovery code. --- ledger/src/blockstore.rs | 102 +++++++++++++++++---------------------- ledger/src/shred.rs | 67 +++++++++---------------- 2 files changed, 69 insertions(+), 100 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 61f9ab543c94e1..cd941249cd85de 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -763,10 +763,10 @@ impl Blockstore { fn get_recovery_data_shreds<'a>( &'a self, index: &'a Index, - slot: Slot, erasure_meta: &'a ErasureMeta, prev_inserted_shreds: &'a HashMap, ) -> impl Iterator + 'a { + let slot = index.slot; erasure_meta.data_shreds_indices().filter_map(move |i| { let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data); if let Some(shred) = prev_inserted_shreds.get(&key) { @@ -792,10 +792,10 @@ impl Blockstore { fn get_recovery_coding_shreds<'a>( &'a self, index: &'a Index, - slot: Slot, erasure_meta: &'a ErasureMeta, prev_inserted_shreds: &'a HashMap, ) -> impl Iterator + 'a { + let slot = index.slot; erasure_meta.coding_shreds_indices().filter_map(move |i| { let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code); if let Some(shred) = prev_inserted_shreds.get(&key) { @@ -823,19 +823,12 @@ impl Blockstore { index: &Index, erasure_meta: &ErasureMeta, prev_inserted_shreds: &HashMap, - leader_schedule_cache: &LeaderScheduleCache, reed_solomon_cache: &ReedSolomonCache, ) -> std::result::Result, shred::Error> { // Find shreds for this erasure set and try recovery - let slot = index.slot; - let available_shreds: Vec<_> = self - .get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_shreds) - .chain(self.get_recovery_coding_shreds(index, slot, erasure_meta, prev_inserted_shreds)) - .collect(); - let get_slot_leader = |slot: Slot| -> Option { - leader_schedule_cache.slot_leader_at(slot, /*bank:*/ None) - }; - shred::recover(available_shreds, reed_solomon_cache, get_slot_leader) + let data = self.get_recovery_data_shreds(index, erasure_meta, prev_inserted_shreds); + let code = self.get_recovery_coding_shreds(index, erasure_meta, prev_inserted_shreds); + shred::recover(data.chain(code), reed_solomon_cache) } /// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the @@ -942,7 +935,6 @@ impl Blockstore { erasure_metas: &'a BTreeMap>, index_working_set: &'a HashMap, prev_inserted_shreds: &'a HashMap, - leader_schedule_cache: &'a LeaderScheduleCache, reed_solomon_cache: &'a ReedSolomonCache, ) -> impl Iterator> + 'a { // Recovery rules: @@ -964,7 +956,6 @@ impl Blockstore { index, erasure_meta, prev_inserted_shreds, - leader_schedule_cache, reed_solomon_cache, ) })? @@ -987,49 +978,46 @@ impl Blockstore { metrics: &mut BlockstoreInsertionMetrics, ) { let mut start = Measure::start("Shred recovery"); - if let Some(leader_schedule_cache) = leader_schedule { - let mut recovered_shreds = Vec::new(); - let recovered_data_shreds: Vec<_> = self - .try_shred_recovery( - &shred_insertion_tracker.erasure_metas, - &shred_insertion_tracker.index_working_set, - &shred_insertion_tracker.just_inserted_shreds, - leader_schedule_cache, - reed_solomon_cache, - ) - .map(|mut shreds| { - // All shreds should be retransmitted, but because there - // are no more missing data shreds in the erasure batch, - // coding shreds are not stored in blockstore. - recovered_shreds - .extend(shred::drain_coding_shreds(&mut shreds).map(Shred::into_payload)); - recovered_shreds.extend(shreds.iter().map(Shred::payload).cloned()); - shreds - }) - .collect(); - if !recovered_shreds.is_empty() { - let _ = retransmit_sender.send(recovered_shreds); - } - for shred in recovered_data_shreds.into_iter().flatten() { - metrics.num_recovered += 1; - *match self.check_insert_data_shred( - shred, - shred_insertion_tracker, - is_trusted, - leader_schedule, - ShredSource::Recovered, - ) { - Err(InsertDataShredError::Exists) => &mut metrics.num_recovered_exists, - Err(InsertDataShredError::InvalidShred) => { - &mut metrics.num_recovered_failed_invalid - } - Err(InsertDataShredError::BlockstoreError(err)) => { - error!("blockstore error: {err}"); - &mut metrics.num_recovered_blockstore_error - } - Ok(()) => &mut metrics.num_recovered_inserted, - } += 1; - } + let mut recovered_shreds = Vec::new(); + let recovered_data_shreds: Vec<_> = self + .try_shred_recovery( + &shred_insertion_tracker.erasure_metas, + &shred_insertion_tracker.index_working_set, + &shred_insertion_tracker.just_inserted_shreds, + reed_solomon_cache, + ) + .map(|mut shreds| { + // All shreds should be retransmitted, but because there + // are no more missing data shreds in the erasure batch, + // coding shreds are not stored in blockstore. + recovered_shreds + .extend(shred::drain_coding_shreds(&mut shreds).map(Shred::into_payload)); + recovered_shreds.extend(shreds.iter().map(Shred::payload).cloned()); + shreds + }) + .collect(); + if !recovered_shreds.is_empty() { + let _ = retransmit_sender.send(recovered_shreds); + } + for shred in recovered_data_shreds.into_iter().flatten() { + metrics.num_recovered += 1; + *match self.check_insert_data_shred( + shred, + shred_insertion_tracker, + is_trusted, + leader_schedule, + ShredSource::Recovered, + ) { + Err(InsertDataShredError::Exists) => &mut metrics.num_recovered_exists, + Err(InsertDataShredError::InvalidShred) => { + &mut metrics.num_recovered_failed_invalid + } + Err(InsertDataShredError::BlockstoreError(err)) => { + error!("blockstore error: {err}"); + &mut metrics.num_recovered_blockstore_error + } + Ok(()) => &mut metrics.num_recovered_inserted, + } += 1; } start.stop(); metrics.shred_recovery_elapsed_us += start.as_us(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index cf3b486b2d24c4..c93b3ffab678b9 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -55,10 +55,10 @@ pub(crate) use self::shred_code::MAX_CODE_SHREDS_PER_SLOT; use { self::{shred_code::ShredCode, traits::Shred as _}, crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT}, + assert_matches::debug_assert_matches, bitflags::bitflags, num_enum::{IntoPrimitive, TryFromPrimitive}, rayon::ThreadPool, - reed_solomon_erasure::Error::TooFewShardsPresent, serde::{Deserialize, Serialize}, solana_entry::entry::{create_ticks, Entry}, solana_perf::packet::Packet, @@ -1107,50 +1107,31 @@ impl TryFrom for ShredVariant { } pub(crate) fn recover( - shreds: Vec, + shreds: impl IntoIterator, reed_solomon_cache: &ReedSolomonCache, - get_slot_leader: impl Fn(Slot) -> Option, ) -> Result, Error> { - match shreds - .first() - .ok_or(TooFewShardsPresent)? - .common_header() - .shred_variant - { - ShredVariant::LegacyData | ShredVariant::LegacyCode => { - let mut shreds = Shredder::try_recovery(shreds, reed_solomon_cache)?; - shreds.retain(|shred| { - get_slot_leader(shred.slot()) - .map(|pubkey| shred.verify(&pubkey)) - .unwrap_or_default() - }); - Ok(shreds) - } - ShredVariant::MerkleCode { .. } | ShredVariant::MerkleData { .. } => { - let shreds = shreds - .into_iter() - .map(merkle::Shred::try_from) - .collect::>()?; - // With Merkle shreds, leader signs the Merkle root of the erasure - // batch and all shreds within the same erasure batch have the same - // signature. - // For recovered shreds, the (unique) signature is copied from - // shreds which were received from turbine (or repair) and are - // already sig-verified. - // The same signature also verifies for recovered shreds because - // when reconstructing the Merkle tree for the erasure batch, we - // will obtain the same Merkle root. - merkle::recover(shreds, reed_solomon_cache)? - .map(|shred| { - let shred = Shred::from(shred?); - debug_assert!(get_slot_leader(shred.slot()) - .map(|pubkey| shred.verify(&pubkey)) - .unwrap_or_default()); - Ok(shred) - }) - .collect() - } - } + let shreds = shreds + .into_iter() + .map(|shred| { + debug_assert_matches!( + shred.common_header().shred_variant, + ShredVariant::MerkleCode { .. } | ShredVariant::MerkleData { .. } + ); + merkle::Shred::try_from(shred) + }) + .collect::>()?; + // With Merkle shreds, leader signs the Merkle root of the erasure + // batch and all shreds within the same erasure batch have the same + // signature. + // For recovered shreds, the (unique) signature is copied from + // shreds which were received from turbine (or repair) and are + // already sig-verified. + // The same signature also verifies for recovered shreds because + // when reconstructing the Merkle tree for the erasure batch, we + // will obtain the same Merkle root. + merkle::recover(shreds, reed_solomon_cache)? + .map(|shred| Ok(Shred::from(shred?))) + .collect() } #[allow(clippy::too_many_arguments)]