From bbe2b808ea67fc891aa28dba0f7ed969baa4262f Mon Sep 17 00:00:00 2001 From: Vladimir Motylenko Date: Tue, 11 Feb 2025 18:00:53 +0200 Subject: [PATCH] fix: Add test for gc and remove all panics. --- core/src/replay_stage.rs | 2 +- core/src/tvu.rs | 2 +- core/src/validator.rs | 2 +- evm-utils/evm-state/src/state.rs | 3 +- evm-utils/evm-state/src/storage/mod.rs | 8 +- .../subchain-manager/src/implementation.rs | 3 +- ledger/src/bank_forks_utils.rs | 2 +- ledger/src/blockstore_processor.rs | 2 +- ledger/src/evm/mod.rs | 231 +++++++++++++++--- ledger/src/evm/{recoreder.rs => recorder.rs} | 1 + notes.md | 3 +- rpc/src/evm_rpc_impl/mod.rs | 25 +- rpc/src/rpc.rs | 2 +- scripts/run.sh | 3 +- test-validator/src/lib.rs | 2 +- validator/src/admin_rpc_service.rs | 3 +- validator/src/main.rs | 16 +- 17 files changed, 238 insertions(+), 72 deletions(-) rename ledger/src/evm/{recoreder.rs => recorder.rs} (99%) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0ff5b3c9c8..57fd4ec917 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -35,7 +35,7 @@ use { block_error::BlockError, blockstore::Blockstore, blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, - evm::recoreder::EvmArchiveManagerSender, + evm::recorder::EvmArchiveManagerSender, leader_schedule_cache::LeaderScheduleCache, leader_schedule_utils::first_of_consecutive_leader_slots, }, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 72c58f456b..f5e6a54aaf 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -33,7 +33,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::Blockstore, blockstore_processor::TransactionStatusSender, - evm::recoreder::EvmArchiveManagerSender, leader_schedule_cache::LeaderScheduleCache, + evm::recorder::EvmArchiveManagerSender, leader_schedule_cache::LeaderScheduleCache, }, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ diff --git a/core/src/validator.rs b/core/src/validator.rs index 8fc49f135c..b84b8b16f8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -42,7 +42,7 @@ use { blockstore_db::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions}, blockstore_processor::{self, TransactionStatusSender}, evm::{ - recoreder::{ + recorder::{ EvmArchiveManagerReceiver, EvmArchiveManagerSender, EvmArchiveManagerService, }, EvmArchive, EvmArchiveInner, EvmArchiveType, diff --git a/evm-utils/evm-state/src/state.rs b/evm-utils/evm-state/src/state.rs index 9fe981fc0f..ee4dff9b6a 100644 --- a/evm-utils/evm-state/src/state.rs +++ b/evm-utils/evm-state/src/state.rs @@ -275,7 +275,7 @@ impl EvmBackend { } } - pub fn set_initial( + pub fn init_accounts_without_commit( &mut self, accounts: impl IntoIterator, ) { @@ -298,7 +298,6 @@ impl EvmBackend { self.set_account_state(address, account_state); self.ext_storage(address, storage); } - self.flush_changes() } pub fn new_incomming_for_root(mut self, root: H256) -> Option { diff --git a/evm-utils/evm-state/src/storage/mod.rs b/evm-utils/evm-state/src/storage/mod.rs index 5435b47aec..24de8c4171 100644 --- a/evm-utils/evm-state/src/storage/mod.rs +++ b/evm-utils/evm-state/src/storage/mod.rs @@ -273,7 +273,7 @@ impl StorageSecondary { } fn open(location: Location, gc_enabled: bool) -> Result { - log::warn!("gc_enabled {}", gc_enabled); + log::debug!("gc_enabled {}", gc_enabled); let db_opts = default_db_opts()?; let descriptors = Descriptors::secondary_descriptors(gc_enabled); @@ -321,7 +321,7 @@ impl Storage { } fn open(location: Location, gc_enabled: bool) -> Result { - log::warn!("gc_enabled {}", gc_enabled); + log::debug!("gc_enabled {}", gc_enabled); log::info!("location is {:?}", location); let db_opts = default_db_opts()?; @@ -332,7 +332,7 @@ impl Storage { let descriptors = Descriptors::compute(exist_cfs, &db_opts, gc_enabled); let db = { - warn!("Trying as primary at : {:?}", &location); + info!("Trying as primary at : {:?}", &location); let mut db = DB::open_cf_descriptors(&db_opts, &location, descriptors.all)?; for removed_cf in descriptors.cleanup_cfs { @@ -888,6 +888,8 @@ pub struct RootCleanup<'a> { } impl<'a> RootCleanup<'a> { + // cleanup nodes from trie, + // remove only nodes that already has zero parents. pub fn new(storage: &'a Storage, roots: Vec) -> Self { Self { elems: roots, diff --git a/evm-utils/subchain-manager/src/implementation.rs b/evm-utils/subchain-manager/src/implementation.rs index b9073e1845..b0b476d49b 100644 --- a/evm-utils/subchain-manager/src/implementation.rs +++ b/evm-utils/subchain-manager/src/implementation.rs @@ -75,7 +75,8 @@ impl genesis_json::GenesisConfig { let simulation = client.simulate_transaction(&transaction)?; if let Some(err) = simulation.value.err { return Err(color_eyre::eyre::Error::msg(format!( - "Simulation error: {err}" + "Simulation error: {err}, logs: {:?}", + simulation.value.logs ))); } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 3f3c43eb6d..b59a93885e 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -5,7 +5,7 @@ use { self, BlockstoreProcessorError, CacheBlockMetaSender, ProcessOptions, TransactionStatusSender, }, - evm::recoreder::EvmArchiveManagerSender, + evm::recorder::EvmArchiveManagerSender, leader_schedule_cache::LeaderScheduleCache, }, crossbeam_channel::unbounded, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b153d34430..95d0f4f676 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -4,7 +4,7 @@ use { blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::SlotMeta, - evm::recoreder::{EvmArchiveManagerRequest, EvmArchiveManagerSender, RecorderEntry}, + evm::recorder::{EvmArchiveManagerRequest, EvmArchiveManagerSender, RecorderEntry}, leader_schedule_cache::LeaderScheduleCache, token_balances::collect_token_balances, }, diff --git a/ledger/src/evm/mod.rs b/ledger/src/evm/mod.rs index 67c99168ec..22bca3f11d 100644 --- a/ledger/src/evm/mod.rs +++ b/ledger/src/evm/mod.rs @@ -1,6 +1,6 @@ use { crate::blockstore::{Blockstore, BlockstoreError}, - evm_state::{ChangedState, EvmState, Incomming, Storage, H256}, + evm_state::{storage::RootCleanup, ChangedState, Storage, H256}, solana_program_runtime::evm_executor_context::{ChainID, StateExt}, solana_runtime::bank::Bank, std::{ @@ -9,7 +9,7 @@ use { }, triedb::gc::DbCounter, }; -pub mod recoreder; +pub mod recorder; //TODO Fix cleanup of blocks for subchain todo!() @@ -42,7 +42,7 @@ pub struct EvmArchiveInner { blockstore: Arc, } pub const EVM_ARCHIVE_PATH: &str = "evm_archive_state"; -pub const EVM_ARCHIVE_LIMIT_BLOCKS: u64 = 1000; +pub const EVM_ARCHIVE_LIMIT_BLOCKS: u64 = 100; // priv api impl EvmArchiveInner { pub fn testing(ledger_path: impl AsRef, blockstore: Arc) -> Self { @@ -55,8 +55,11 @@ impl EvmArchiveInner { blockstore: Arc, ) -> Self { let storage = match &archive_type { - EvmArchiveType::WithGc(_) => { - info!("Opening temporary evm archive storage"); + EvmArchiveType::WithGc(p) => { + info!( + "Opening temporary evm archive storage, persist_last_slots={}", + p.states_per_chain + ); let evm_state_path = PathBuf::from(ledger_path.as_ref()).join(EVM_ARCHIVE_PATH); Storage::open_persistent(evm_state_path, true) .expect("Cannot open evm archive folder") @@ -86,30 +89,64 @@ impl EvmArchiveInner { } let root = storage.flush_changes(state_root, state_updates); let trie = storage.rocksdb_trie_handle(); + log::trace!("pin root: {:?}", root); // register root trie.gc_pin_root(root); } - fn unregister_state(storage: &Storage, state_root: H256) { + fn unregister_state( + storage: &Storage, + state_root: H256, + ) -> Result<(), evm_state::storage::Error> { let trie = storage.rocksdb_trie_handle(); trie.gc_unpin_root(state_root); - todo!() // cleanup trie if needed + RootCleanup::new(storage, vec![state_root]).cleanup()?; + Ok(()) } -} -// pub api -impl EvmArchiveInner { + fn write_evm_block(blockstore: &Blockstore, chain: Option, block: evm_state::Block) { + // let (chain, block) = evm_records_receiver.recv_timeout(Duration::from_secs(1))?; + let block_header = block.header; + info!( + "Writing evm block num = {} for chain = {:?}", + block_header.block_number, chain + ); + blockstore + .write_evm_block_header(&chain, &block_header) + .expect("Expected database write to succed"); + for (hash, tx) in block.transactions { + blockstore + .write_evm_transaction( + &chain, + block_header.block_number, + block_header.native_chain_slot, + hash, + tx, + ) + .expect("Expected database write to succed"); + } + } fn cleanup_block( storage: &Storage, blockstore: &Blockstore, chain: Option, block_num: u64, ) -> Result<(), BlockstoreError> { + info!( + "Cleaning evm block num = {} for chain = {:?}", + block_num, chain + ); let (block, _) = blockstore.get_evm_block(chain, block_num)?; blockstore.remove_evm_block(&chain, &block.header)?; - Self::unregister_state(storage, block.header.state_root); + Self::unregister_state(storage, block.header.state_root) + .map_err(|_| BlockstoreError::Other("Failed to process block cleanup"))?; Ok(()) } +} +// pub api +// pub fn - can use in rpc +// fn - can be used in recorder +impl EvmArchiveInner { pub fn count_evm_blocks( blockstore: &Blockstore, chain: Option, @@ -122,12 +159,24 @@ impl EvmArchiveInner { Ok(0) } } + // Get num_blocks/states per subchain + // get accumulated purged blocks per subchain + // get total size (in blocks, mbs) of db + + // fn collect_metrics(&self) { + // for chain in chain_list { - fn write_evm_record(&self, record: recoreder::RecorderEntry) { + // let first = self.blockstore.get_first_available_evm_block(chain)?; + // let last = self.blockstore.get_last_available_evm_block(chain)?; + // let num_blocks = (last - first); + // } + // } + + fn write_evm_record(&self, record: recorder::RecorderEntry) { Self::write_evm_block(&self.blockstore, record.chain, record.block.clone()); - // TODO: Push block? // push state to archive Self::register_state(&self.storage, record.state_root, record.state_updates); + // ensure that block contain root to new state let block_num = record.block.header.block_number; if let EvmArchiveType::WithGc(gc) = &self.archive_type { @@ -145,10 +194,10 @@ impl EvmArchiveInner { debug_assert!(first_block_num <= block_num); assert!(first_block_num <= block_num); - let block_to_purge = block_num.saturating_sub(gc.states_per_chain); + let block_to_purge = block_num.saturating_sub(gc.states_per_chain - 1); // cleanup old blocks - for block_num in first_block_num..=block_to_purge { + for block_num in first_block_num..block_to_purge { match Self::cleanup_block(&self.storage, &self.blockstore, record.chain, block_num) { Ok(_) => {} @@ -163,28 +212,6 @@ impl EvmArchiveInner { } } - fn write_evm_block(blockstore: &Blockstore, chain: Option, block: evm_state::Block) { - // let (chain, block) = evm_records_receiver.recv_timeout(Duration::from_secs(1))?; - let block_header = block.header; - debug!( - "Writing evm block num = {} for chain = {:?}", - block_header.block_number, chain - ); - blockstore - .write_evm_block_header(&chain, &block_header) - .expect("Expected database write to succed"); - for (hash, tx) in block.transactions { - blockstore - .write_evm_transaction( - &chain, - block_header.block_number, - block_header.native_chain_slot, - hash, - tx, - ) - .expect("Expected database write to succed"); - } - } pub fn get_state( &self, chain_id: Option, @@ -223,3 +250,133 @@ impl EvmArchiveInner { } pub type EvmArchive = Arc; + +#[cfg(test)] +mod test { + use { + super::*, + evm_state::{AccountState, Block, BlockHeader, Maybe, H160}, + std::collections::HashMap, + tempfile::tempdir, + triedb::empty_trie_hash, + }; + + fn make_changes(idx: u8) -> ChangedState { + let key = H160::repeat_byte(idx); + let acc = AccountState { + nonce: idx.into(), + balance: idx.into(), + code: Default::default(), + }; + let mut state = HashMap::new(); + state.insert(key, (Maybe::Just(acc), HashMap::new())); + state + } + fn next(tmp_storage: &Storage, record: &recorder::RecorderEntry) -> recorder::RecorderEntry { + let changes = make_changes(record.block.header.block_number as u8); + let old_root = record.block.header.state_root; + let new_state = tmp_storage.flush_changes(old_root, changes.clone()); + log::info!("Old root = {:?}", old_root); + log::info!("New root = {:?}", new_state); + assert_ne!(new_state, old_root); + let block_header = BlockHeader { + block_number: record.block.header.block_number + 1, + parent_hash: record.block.header.hash(), + state_root: new_state, + ..record.block.header.clone() + }; + let block = Block { + header: block_header, + transactions: Default::default(), + }; + recorder::RecorderEntry { + chain: record.chain, + block, + state_root: old_root, + state_updates: changes, + } + } + // create archive with gc and num_blocks = 1 + // push blocks to archive + // get state with root of first state to purge + // add one block to ensure that state is purged + // check if state ref is still available + #[test] + fn test_lock_get_state() { + solana_logger::setup_with_default("trace,triedb=warn"); + let ledger_path = tempdir().unwrap(); + + let num_blocks = 3; + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let new_archive = EvmArchiveInner::new( + EvmArchiveType::WithGc(EvmArchiveGc::new(num_blocks)), + ledger_path.path(), + blockstore.clone(), + ); + + assert!(new_archive.is_gc()); + let chain = None; + let mut record = recorder::RecorderEntry { + chain, + block: Block { + header: BlockHeader::new( + Default::default(), + 0, + empty_trie_hash(), + 0, + 0, + 0, + 0, + H256::zero(), + vec![].into_iter(), + evm_state::BlockVersion::VersionConsistentHashes, + ), + transactions: Default::default(), + }, + state_root: empty_trie_hash(), + state_updates: Default::default(), + }; // first update without changes in state + + new_archive.write_evm_record(record.clone()); + let mut states = vec![]; + + let tmp_storage = Storage::create_temporary().unwrap(); + for _ in 0..num_blocks { + record = next(&tmp_storage, &record); + new_archive.write_evm_record(record.clone()); + states.push(record.clone()); + } + assert_eq!(states.len(), num_blocks as usize); + for state in &states { + assert!(new_archive.storage.check_root_exist(state.state_root)) + } + log::info!("Starting gc"); + let lock_root = states[0].block.header.state_root; + let mocked_bank = Bank::default_for_tests(); + // while this state exist lock_root should also exist in db + let state = new_archive + .get_state(chain, lock_root, &mocked_bank, None) + .unwrap(); + + let new_record = next(&tmp_storage, &record); + assert_ne!(new_record.state_root, states[0].block.header.state_root); + new_archive.write_evm_record(new_record); + log::info!("first state root = {:?}", states[0].block.header.state_root); + // first state should be removed + assert!(new_archive + .storage + .check_root_exist(states[0].block.header.state_root)); + drop(state); + + assert!(!new_archive + .storage + .check_root_exist(states[0].block.header.state_root)) + } +} + +// 1. todo remove +// 2. lock_state - unit test +// 3. start node +// 4. metrics +// 5. lock_state fix +// 6. rpc test diff --git a/ledger/src/evm/recoreder.rs b/ledger/src/evm/recorder.rs similarity index 99% rename from ledger/src/evm/recoreder.rs rename to ledger/src/evm/recorder.rs index eb86605fca..8a22a93c07 100644 --- a/ledger/src/evm/recoreder.rs +++ b/ledger/src/evm/recorder.rs @@ -13,6 +13,7 @@ use { }, }; +#[derive(Clone)] pub struct RecorderEntry { pub chain: Option, pub state_root: H256, diff --git a/notes.md b/notes.md index a99d876f82..c27987c7f5 100644 --- a/notes.md +++ b/notes.md @@ -23,4 +23,5 @@ 3. new eip tx format 4. new evm version -fix cleanup old blocks for subchain (search todo!()) \ No newline at end of file +fix cleanup old blocks for subchain (search todo!()) +test lock_root in rpc (get state, and remove in recorder thread). \ No newline at end of file diff --git a/rpc/src/evm_rpc_impl/mod.rs b/rpc/src/evm_rpc_impl/mod.rs index 9ea8f6dfba..b0a0a51257 100644 --- a/rpc/src/evm_rpc_impl/mod.rs +++ b/rpc/src/evm_rpc_impl/mod.rs @@ -121,32 +121,23 @@ async fn block_to_state_root( block: Option, meta: &JsonRpcRequestProcessor, ) -> Result { - let main_chain = chain.is_none(); + if let Some(chain_id) = chain { + if chain_id == meta.get_main_chain_id() { + return Err(Error::WrongChainId { + chain_id: meta.get_main_chain_id(), + tx_chain_id: None, + }); + } + }; let block_id = block.unwrap_or_default(); let mut found_block_hash = None; - match block_id { - BlockId::RelativeId(BlockRelId::Pending) | BlockId::RelativeId(BlockRelId::Latest) => {} - _ => { - //TODO(H): Add support of block_by_num state on subchain - if !main_chain { - return Err(Error::InvalidParams {}); - } - } - } - let block_num = match block_id { BlockId::RelativeId(BlockRelId::Pending) | BlockId::RelativeId(BlockRelId::Latest) => { let bank = meta.bank(Some(CommitmentConfig::processed())); let last_root = if let Some(chain_id) = chain { - if chain_id == meta.get_main_chain_id() { - return Err(Error::WrongChainId { - chain_id: meta.get_main_chain_id(), - tx_chain_id: None, - }); - } bank.evm().chain_state(chain_id).evm_state.last_root() } else { let evm = bank.evm().main_chain().state(); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 560d668d81..e9528dbd08 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2475,7 +2475,7 @@ impl JsonRpcRequestProcessor { id: evm_state::BlockNum, ) -> Option<(evm_state::Block, bool)> { let block = self.blockstore.get_evm_block(chain, id).ok(); - if block.is_some() || chain.is_none() { + if block.is_some() { return block; } diff --git a/scripts/run.sh b/scripts/run.sh index b5e3968dd1..1af4c27b7e 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -125,7 +125,8 @@ args=( --account-index velas-accounts-storages --account-index velas-accounts-owners --account-index velas-accounts-operationals - --evm-state-archive "$ledgerDir"/archive-evm + --evm-save-blocks 1000 + # --evm-state-archive "$ledgerDir"/archive-evm ) # shellcheck disable=SC2086 velas-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS & diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index a691a18ca2..f5b77c4d72 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -22,7 +22,7 @@ use { blockstore::{create_new_ledger, EvmStateJson}, blockstore_db::LedgerColumnOptions, create_new_tmp_ledger, - evm::recoreder::{EvmArchiveManagerReceiver, EvmArchiveManagerSender}, + evm::recorder::{EvmArchiveManagerReceiver, EvmArchiveManagerSender}, }, solana_net_utils::PortRange, solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 72053eb18d..19229843b3 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -11,7 +11,7 @@ use { consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress, }, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, - solana_ledger::evm::recoreder::{EvmArchiveManagerRequest, EvmArchiveManagerSender}, + solana_ledger::evm::recorder::{EvmArchiveManagerRequest, EvmArchiveManagerSender}, solana_runtime::bank_forks::BankForks, solana_sdk::{ exit::Exit, @@ -316,6 +316,7 @@ impl AdminRpc for AdminRpcImpl { backup: bool, ) -> Result<()> { info!("Merging evm state: {}, backup: {}", path, backup); + //todo: oneshot let (tx, rx) = unbounded(); let evm_archive_state = 'try_send: { if meta diff --git a/validator/src/main.rs b/validator/src/main.rs index 9d8417889e..ebaf865654 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2,7 +2,7 @@ use crossbeam_channel::unbounded; #[cfg(not(target_env = "msvc"))] use jemallocator::Jemalloc; -use solana_ledger::evm::{EvmArchive, EvmArchiveType}; +use solana_ledger::evm::{EvmArchive, EvmArchiveGc, EvmArchiveType, EVM_ARCHIVE_LIMIT_BLOCKS}; use { clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App, @@ -538,8 +538,17 @@ pub fn main() { .long("evm-state-archive") .value_name("DIR") .takes_value(true) + .conflicts_with("evm_save_blocks") .help("Use DIR as evm-state archive location"), ) + .arg( + Arg::with_name("evm_save_blocks") + .long("evm-save-blocks") + .value_name("NUM") + .takes_value(true) + .conflicts_with("evm_state_archive_path") + .help("Specify amount of evm blocks to store in archive"), + ) .arg( Arg::with_name("evm_state_rpc_port") .long("evm-state-rpc-port") @@ -2264,7 +2273,10 @@ pub fn main() { }; let evm_state_archive_params = match matches.value_of("evm_state_archive_path") { Some(path) => EvmArchiveType::NoCleanup(path.to_owned()), - _ => EvmArchiveType::default_gc(), // TODO: config block count + _ => { + let num_blocks = matches.value_of("evm_save_blocks").and_then(|v|v.parse::().ok()).unwrap_or(EVM_ARCHIVE_LIMIT_BLOCKS); + EvmArchiveType::WithGc(EvmArchiveGc::new(num_blocks)) + } };