diff --git a/core/src/validator.rs b/core/src/validator.rs index bdc069a9ab894c..c531a6bc3b94e9 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1551,6 +1551,7 @@ impl Validator { WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT, snapshot_config: config.snapshot_config.clone(), accounts_background_request_sender: accounts_background_request_sender.clone(), + abs_status: accounts_background_service.status().clone(), genesis_config_hash: genesis_config.hash(), exit: exit.clone(), })?; diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 6c57cea8282e42..75b89fd505735d 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -562,6 +562,7 @@ impl AbsRequestHandlers { pub struct AccountsBackgroundService { t_background: JoinHandle<()>, + status: AbsStatus, } impl AccountsBackgroundService { @@ -571,131 +572,141 @@ impl AccountsBackgroundService { request_handlers: AbsRequestHandlers, test_hash_calculation: bool, ) -> Self { + let is_running = Arc::new(AtomicBool::new(true)); + let stop = Arc::new(AtomicBool::new(false)); let mut last_cleaned_block_height = 0; let mut removed_slots_count = 0; let mut total_remove_slots_time = 0; let t_background = Builder::new() .name("solBgAccounts".to_string()) - .spawn(move || { - info!("AccountsBackgroundService has started"); - let mut stats = StatsManager::new(); - let mut last_snapshot_end_time = None; - - loop { - if exit.load(Ordering::Relaxed) { - break; - } - let start_time = Instant::now(); - - // Grab the current root bank - let bank = bank_forks.read().unwrap().root_bank(); - - // Purge accounts of any dead slots - request_handlers - .pruned_banks_request_handler - .remove_dead_slots( - &bank, - &mut removed_slots_count, - &mut total_remove_slots_time, - ); - - let non_snapshot_time = last_snapshot_end_time - .map(|last_snapshot_end_time: Instant| { - last_snapshot_end_time.elapsed().as_micros() - }) - .unwrap_or_default(); - - // Check to see if there were any requests for snapshotting banks - // < the current root bank `bank` above. - - // Claim: Any snapshot request for slot `N` found here implies that the last cleanup - // slot `M` satisfies `M < N` - // - // Proof: Assume for contradiction that we find a snapshot request for slot `N` here, - // but cleanup has already happened on some slot `M >= N`. Because the call to - // `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`, - // then that means in some *previous* iteration of this loop, we must have gotten a root - // bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the - // snapshot request channel. - // - // However, this is impossible because BankForks.set_root() will always flush the snapshot - // request for `N` to the snapshot request channel before setting a root `R > N`, and - // snapshot_request_handler.handle_requests() will always look for the latest - // available snapshot in the channel. - // - // NOTE: We must wait for startup verification to complete before handling - // snapshot requests. This is because startup verification and snapshot - // request handling can both kick off accounts hash calculations in background - // threads, and these must not happen concurrently. - let snapshot_handle_result = bank - .is_startup_verification_complete() - .then(|| { - request_handlers.handle_snapshot_requests( - test_hash_calculation, - non_snapshot_time, - &exit, - ) - }) - .flatten(); - if snapshot_handle_result.is_some() { - last_snapshot_end_time = Some(Instant::now()); - } - - // Note that the flush will do an internal clean of the - // cache up to bank.slot(), so should be safe as long - // as any later snapshots that are taken are of - // slots >= bank.slot() - bank.flush_accounts_cache_if_needed(); - - if let Some(snapshot_handle_result) = snapshot_handle_result { - // Safe, see proof above + .spawn({ + let is_running = is_running.clone(); + let stop = stop.clone(); + + move || { + info!("AccountsBackgroundService has started"); + let mut stats = StatsManager::new(); + let mut last_snapshot_end_time = None; + + loop { + if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) { + break; + } + let start_time = Instant::now(); + + // Grab the current root bank + let bank = bank_forks.read().unwrap().root_bank(); + + // Purge accounts of any dead slots + request_handlers + .pruned_banks_request_handler + .remove_dead_slots( + &bank, + &mut removed_slots_count, + &mut total_remove_slots_time, + ); + + let non_snapshot_time = last_snapshot_end_time + .map(|last_snapshot_end_time: Instant| { + last_snapshot_end_time.elapsed().as_micros() + }) + .unwrap_or_default(); + + // Check to see if there were any requests for snapshotting banks + // < the current root bank `bank` above. + + // Claim: Any snapshot request for slot `N` found here implies that the last cleanup + // slot `M` satisfies `M < N` + // + // Proof: Assume for contradiction that we find a snapshot request for slot `N` here, + // but cleanup has already happened on some slot `M >= N`. Because the call to + // `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`, + // then that means in some *previous* iteration of this loop, we must have gotten a root + // bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the + // snapshot request channel. + // + // However, this is impossible because BankForks.set_root() will always flush the snapshot + // request for `N` to the snapshot request channel before setting a root `R > N`, and + // snapshot_request_handler.handle_requests() will always look for the latest + // available snapshot in the channel. + // + // NOTE: We must wait for startup verification to complete before handling + // snapshot requests. This is because startup verification and snapshot + // request handling can both kick off accounts hash calculations in background + // threads, and these must not happen concurrently. + let snapshot_handle_result = bank + .is_startup_verification_complete() + .then(|| { + request_handlers.handle_snapshot_requests( + test_hash_calculation, + non_snapshot_time, + &exit, + ) + }) + .flatten(); + if snapshot_handle_result.is_some() { + last_snapshot_end_time = Some(Instant::now()); + } - match snapshot_handle_result { - Ok(snapshot_block_height) => { - assert!(last_cleaned_block_height <= snapshot_block_height); - last_cleaned_block_height = snapshot_block_height; + // Note that the flush will do an internal clean of the + // cache up to bank.slot(), so should be safe as long + // as any later snapshots that are taken are of + // slots >= bank.slot() + bank.flush_accounts_cache_if_needed(); + + if let Some(snapshot_handle_result) = snapshot_handle_result { + // Safe, see proof above + + match snapshot_handle_result { + Ok(snapshot_block_height) => { + assert!(last_cleaned_block_height <= snapshot_block_height); + last_cleaned_block_height = snapshot_block_height; + } + Err(err) => { + error!("Stopping AccountsBackgroundService! Fatal error while handling snapshot requests: {err}"); + exit.store(true, Ordering::Relaxed); + break; + } } - Err(err) => { - error!("Stopping AccountsBackgroundService! Fatal error while handling snapshot requests: {err}"); - exit.store(true, Ordering::Relaxed); - break; + } else { + if bank.block_height() - last_cleaned_block_height + > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0..10)) + { + // Note that the flush will do an internal clean of the + // cache up to bank.slot(), so should be safe as long + // as any later snapshots that are taken are of + // slots >= bank.slot() + bank.force_flush_accounts_cache(); + bank.clean_accounts(); + last_cleaned_block_height = bank.block_height(); + // See justification below for why we skip 'shrink' here. + if bank.is_startup_verification_complete() { + bank.shrink_ancient_slots(); + } } - } - } else { - if bank.block_height() - last_cleaned_block_height - > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0..10)) - { - // Note that the flush will do an internal clean of the - // cache up to bank.slot(), so should be safe as long - // as any later snapshots that are taken are of - // slots >= bank.slot() - bank.force_flush_accounts_cache(); - bank.clean_accounts(); - last_cleaned_block_height = bank.block_height(); - // See justification below for why we skip 'shrink' here. + // Do not 'shrink' until *after* the startup verification is complete. + // This is because startup verification needs to get the snapshot + // storages *as they existed at startup* (to calculate the accounts hash). + // If 'shrink' were to run, then it is possible startup verification + // (1) could race with 'shrink', and fail to assert that shrinking is not in + // progress, or (2) could get snapshot storages that were newer than what + // was in the snapshot itself. if bank.is_startup_verification_complete() { - bank.shrink_ancient_slots(); + bank.shrink_candidate_slots(); } } - // Do not 'shrink' until *after* the startup verification is complete. - // This is because startup verification needs to get the snapshot - // storages *as they existed at startup* (to calculate the accounts hash). - // If 'shrink' were to run, then it is possible startup verification - // (1) could race with 'shrink', and fail to assert that shrinking is not in - // progress, or (2) could get snapshot storages that were newer than what - // was in the snapshot itself. - if bank.is_startup_verification_complete() { - bank.shrink_candidate_slots(); - } + stats.record_and_maybe_submit(start_time.elapsed()); + sleep(Duration::from_millis(INTERVAL_MS)); } - stats.record_and_maybe_submit(start_time.elapsed()); - sleep(Duration::from_millis(INTERVAL_MS)); - } - info!("AccountsBackgroundService has stopped"); - }) + info!("AccountsBackgroundService has stopped"); + is_running.store(false, Ordering::Relaxed); + }}) .unwrap(); - Self { t_background } + Self { + t_background, + status: AbsStatus { is_running, stop }, + } } /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there @@ -724,6 +735,40 @@ impl AccountsBackgroundService { pub fn join(self) -> thread::Result<()> { self.t_background.join() } + + /// Returns an object to query/manage the status of ABS + pub fn status(&self) -> &AbsStatus { + &self.status + } +} + +/// Query and manage the status of AccountsBackgroundService +#[derive(Debug, Clone)] +pub struct AbsStatus { + /// Flag to query if ABS is running + is_running: Arc, + /// Flag to set to stop ABS + stop: Arc, +} + +impl AbsStatus { + /// Returns if ABS is running + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::Relaxed) + } + + /// Raises the flag for ABS to stop + pub fn stop(&self) { + self.stop.store(true, Ordering::Relaxed) + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn new_for_tests() -> Self { + Self { + is_running: Arc::new(AtomicBool::new(false)), + stop: Arc::new(AtomicBool::new(false)), + } + } } /// Get the AccountsPackageKind from a given SnapshotRequest diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 0c901e65fffc2f..87259051507069 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -34,7 +34,7 @@ use { }, solana_pubkey::Pubkey, solana_runtime::{ - accounts_background_service::AbsRequestSender, + accounts_background_service::{AbsRequestSender, AbsStatus}, bank::Bank, bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter, @@ -474,6 +474,7 @@ pub(crate) fn generate_snapshot( bank_forks: Arc>, snapshot_config: &SnapshotConfig, accounts_background_request_sender: &AbsRequestSender, + abs_status: &AbsStatus, genesis_config_hash: Hash, my_heaviest_fork_slot: Slot, ) -> Result { @@ -506,11 +507,34 @@ pub(crate) fn generate_snapshot( accounts_background_request_sender, )?; } + // There can't be more than one EAH calculation in progress. If new_root is generated // within the EAH window (1/4 epoch to 3/4 epoch), the following function will wait for // EAH calculation to finish. So if we trigger another EAH when generating snapshots // we won't hit a panic. let _ = new_root_bank.get_epoch_accounts_hash_to_serialize(); + + // Snapshot generation calls AccountsDb background tasks (flush/clean/shrink). + // These cannot run conncurrent with each other, so we must shutdown + // AccountsBackgroundService before proceeding. + abs_status.stop(); + info!("Waiting for AccountsBackgroundService to stop"); + while abs_status.is_running() { + std::thread::yield_now(); + } + // Similar to waiting for ABS to stop, we also wait for the initial startup + // verification to complete. The startup verification runs in the background + // and verifies the snapshot's accounts are correct. We only want a + // single accounts hash calculation to run at a time, and since snapshot + // creation below will calculate the accounts hash, we wait for the startup + // verification to complete before proceeding. + new_root_bank + .rc + .accounts + .accounts_db + .verify_accounts_hash_in_bg + .join_background_thread(); + let mut directory = &snapshot_config.full_snapshot_archives_dir; // Calculate the full_snapshot_slot an incremental snapshot should depend on. If the // validator is configured not the generate snapshot, it will only have the initial @@ -973,6 +997,7 @@ pub struct WenRestartConfig { pub wait_for_supermajority_threshold_percent: u64, pub snapshot_config: SnapshotConfig, pub accounts_background_request_sender: AbsRequestSender, + pub abs_status: AbsStatus, pub genesis_config_hash: Hash, pub exit: Arc, } @@ -1084,6 +1109,7 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> { config.bank_forks.clone(), &config.snapshot_config, &config.accounts_background_request_sender, + &config.abs_status, config.genesis_config_hash, my_heaviest_fork_slot, )?, @@ -1686,6 +1712,7 @@ mod tests { wait_for_supermajority_threshold_percent: 80, snapshot_config: SnapshotConfig::default(), accounts_background_request_sender: AbsRequestSender::default(), + abs_status: AbsStatus::new_for_tests(), genesis_config_hash: test_state.genesis_config_hash, exit: exit.clone(), }; @@ -1757,6 +1784,7 @@ mod tests { wait_for_supermajority_threshold_percent: 80, snapshot_config, accounts_background_request_sender: AbsRequestSender::default(), + abs_status: AbsStatus::new_for_tests(), genesis_config_hash: test_state.genesis_config_hash, exit: exit.clone(), }; @@ -2111,6 +2139,7 @@ mod tests { wait_for_supermajority_threshold_percent: 80, snapshot_config: SnapshotConfig::default(), accounts_background_request_sender: AbsRequestSender::default(), + abs_status: AbsStatus::new_for_tests(), genesis_config_hash: test_state.genesis_config_hash, exit: Arc::new(AtomicBool::new(false)), }) @@ -3238,6 +3267,7 @@ mod tests { test_state.bank_forks.clone(), &snapshot_config, &AbsRequestSender::default(), + &AbsStatus::new_for_tests(), test_state.genesis_config_hash, old_root_slot, ) @@ -3253,6 +3283,7 @@ mod tests { test_state.bank_forks.clone(), &snapshot_config, &AbsRequestSender::default(), + &AbsStatus::new_for_tests(), test_state.genesis_config_hash, new_root_slot, ) @@ -3302,6 +3333,7 @@ mod tests { test_state.bank_forks.clone(), &snapshot_config, &AbsRequestSender::default(), + &AbsStatus::new_for_tests(), test_state.genesis_config_hash, old_root_slot, ) @@ -3323,6 +3355,7 @@ mod tests { test_state.bank_forks.clone(), &snapshot_config, &AbsRequestSender::default(), + &AbsStatus::new_for_tests(), test_state.genesis_config_hash, older_slot, ) @@ -3345,6 +3378,7 @@ mod tests { test_state.bank_forks.clone(), &snapshot_config, &AbsRequestSender::default(), + &AbsStatus::new_for_tests(), test_state.genesis_config_hash, empty_slot, ) @@ -3367,6 +3401,7 @@ mod tests { test_state.bank_forks.clone(), &snapshot_config, &AbsRequestSender::default(), + &AbsStatus::new_for_tests(), test_state.genesis_config_hash, test_state.last_voted_fork_slots[0], ) @@ -3397,6 +3432,7 @@ mod tests { wait_for_supermajority_threshold_percent: 80, snapshot_config: SnapshotConfig::default(), accounts_background_request_sender: AbsRequestSender::default(), + abs_status: AbsStatus::new_for_tests(), genesis_config_hash: test_state.genesis_config_hash, exit: Arc::new(AtomicBool::new(false)), }; @@ -3643,6 +3679,7 @@ mod tests { wait_for_supermajority_threshold_percent: 80, snapshot_config: SnapshotConfig::default(), accounts_background_request_sender: AbsRequestSender::default(), + abs_status: AbsStatus::new_for_tests(), genesis_config_hash: test_state.genesis_config_hash, exit: exit.clone(), };