-
Notifications
You must be signed in to change notification settings - Fork 111
perf/bg_trie_update update #4988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| #![allow(clippy::type_complexity)] | ||
|
|
||
| use crate::{ | ||
| rlp::AccountCodeHashRLP, | ||
| trie_db::{ | ||
|
|
@@ -23,7 +25,7 @@ use std::{ | |
| mem::take, | ||
| path::Path, | ||
| sync::{ | ||
| Arc, Mutex, RwLock, | ||
| Arc, | ||
| mpsc::{SyncSender, sync_channel}, | ||
| }, | ||
| }; | ||
|
|
@@ -127,7 +129,7 @@ enum FKVGeneratorControlMessage { | |
| #[derive(Debug, Clone)] | ||
| pub struct Store { | ||
| db: Arc<DBWithThreadMode<MultiThreaded>>, | ||
| trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>, | ||
| trie_cache: Arc<TrieLayerCache>, | ||
| flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>, | ||
| trie_update_worker_tx: std::sync::mpsc::SyncSender<( | ||
| std::sync::mpsc::SyncSender<Result<(), StoreError>>, | ||
|
|
@@ -669,22 +671,12 @@ impl Store { | |
| .chain(account_updates) | ||
| .collect(); | ||
| // Read-Copy-Update the trie cache with a new layer. | ||
| let trie = trie_cache | ||
| .lock() | ||
| .map_err(|_| StoreError::LockError)? | ||
| .clone(); | ||
| let mut trie_mut = (&*trie).clone(); | ||
| trie_mut.put_batch(parent_state_root, child_state_root, new_layer); | ||
| *trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut); | ||
| trie_cache.put_batch(parent_state_root, child_state_root, new_layer); | ||
| // Update finished, signal block production. | ||
| notify.send(Ok(())).map_err(|_| StoreError::LockError)?; | ||
|
|
||
| // Phase 2: update disk layer. | ||
| let trie = trie_cache | ||
| .lock() | ||
| .map_err(|_| StoreError::LockError)? | ||
| .clone(); | ||
| let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else { | ||
| let Some(root) = trie_cache.get_commitable(parent_state_root, COMMIT_THRESHOLD) else { | ||
| // Nothing to commit to disk, move on. | ||
| return Ok(()); | ||
| }; | ||
|
|
@@ -693,14 +685,13 @@ impl Store { | |
| let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop); | ||
|
|
||
| // RCU to remove the bottom layer: update step needs to happen after disk layer is updated. | ||
| let mut trie_mut = (&*trie).clone(); | ||
| let mut batch = WriteBatch::default(); | ||
| let [cf_trie_nodes, cf_flatkeyvalue, cf_misc] = | ||
| open_cfs(db, [CF_TRIE_NODES, CF_FLATKEYVALUE, CF_MISC_VALUES])?; | ||
|
|
||
| let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default(); | ||
| // Commit removes the bottom layer and returns it, this is the mutation step. | ||
| let nodes = trie_mut.commit(root).unwrap_or_default(); | ||
| let nodes = trie_cache.commit(root).unwrap_or_default(); | ||
| for (key, value) in nodes { | ||
| let is_leaf = key.len() == 65 || key.len() == 131; | ||
|
|
||
|
|
@@ -719,8 +710,6 @@ impl Store { | |
| } | ||
| } | ||
| db.write(batch)?; | ||
| // Phase 3: update diff layers with the removal of bottom layer. | ||
| *trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mutation needs to happen at the end. Otherwise you lose the bottom layer BEFORE there is a backing disk layer with the data visible to users.. |
||
| Ok(()) | ||
| } | ||
| } | ||
|
|
@@ -841,7 +830,7 @@ impl StoreEngine for Store { | |
| // Wait for an updated top layer so every caller afterwards sees a consistent view. | ||
| // Specifically, the next block produced MUST see this upper layer. | ||
| rx.recv() | ||
| .map_err(|e| StoreError::Custom(format!("recv failed: {e}")))?; | ||
| .map_err(|e| StoreError::Custom(format!("recv failed: {e}")))??; | ||
| } | ||
| // After top-level is addded, we can make the rest of the changes visible. | ||
| db.write(batch) | ||
|
|
@@ -1397,7 +1386,7 @@ impl StoreEngine for Store { | |
| let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); | ||
| let wrap_db = Box::new(TrieWrapper { | ||
| state_root, | ||
| inner: self.trie_cache.lock().unwrap().clone(), | ||
| inner: self.trie_cache.clone(), | ||
| db, | ||
| prefix: Some(hashed_address), | ||
| }); | ||
|
|
@@ -1409,7 +1398,7 @@ impl StoreEngine for Store { | |
| let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); | ||
| let wrap_db = Box::new(TrieWrapper { | ||
| state_root, | ||
| inner: self.trie_cache.lock().unwrap().clone(), | ||
| inner: self.trie_cache.clone(), | ||
| db, | ||
| prefix: None, | ||
| }); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,12 +2,15 @@ use ethrex_common::H256; | |
| use ethrex_rlp::decode::RLPDecode; | ||
| use std::{ | ||
| collections::BTreeMap, | ||
| sync::{Arc, RwLock}, | ||
| sync::{ | ||
| Arc, Mutex, | ||
| atomic::{AtomicUsize, Ordering}, | ||
| }, | ||
| }; | ||
|
|
||
| use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError}; | ||
|
|
||
| #[derive(Debug)] | ||
| #[derive(Debug, Clone)] | ||
| struct TrieLayer { | ||
| nodes: BTreeMap<Vec<u8>, Vec<u8>>, | ||
| parent: H256, | ||
|
|
@@ -18,14 +21,20 @@ struct TrieLayer { | |
| pub struct TrieLayerCache { | ||
| /// Monotonically increasing ID for layers, starting at 1. | ||
| /// TODO: this implementation panics on overflow | ||
| last_id: usize, | ||
| layers: BTreeMap<H256, Arc<TrieLayer>>, | ||
| last_id: Arc<AtomicUsize>, | ||
| layers: Arc<Mutex<BTreeMap<H256, Arc<TrieLayer>>>>, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think the mutex surrounding the btreemap is worth it (as is done now), since its only used to do the get and clone ASAP to unlock again. using an Arc<Mutex<Arc>> would be too complex IMHO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one forces the tree traversal to be done with the mutex taken. We want to avoid locking in the read path, that's why we would rather have a single outer lock and an |
||
| } | ||
|
|
||
| impl TrieLayerCache { | ||
| pub fn get(&self, state_root: H256, key: Nibbles) -> Option<Vec<u8>> { | ||
| let mut current_state_root = state_root; | ||
| while let Some(layer) = self.layers.get(¤t_state_root) { | ||
| while let Some(layer) = { | ||
| self.layers | ||
| .lock() | ||
| .expect("lock poisoned") | ||
| .get(¤t_state_root) | ||
| .cloned() | ||
| } { | ||
| if let Some(value) = layer.nodes.get(key.as_ref()) { | ||
| return Some(value.clone()); | ||
| } | ||
|
|
@@ -46,7 +55,13 @@ impl TrieLayerCache { | |
| // TODO: use finalized hash to know when to commit | ||
| pub fn get_commitable(&self, mut state_root: H256, commit_threshold: usize) -> Option<H256> { | ||
| let mut counter = 0; | ||
| while let Some(layer) = self.layers.get(&state_root) { | ||
| while let Some(layer) = { | ||
| self.layers | ||
| .lock() | ||
| .expect("lock poisoned") | ||
| .get(&state_root) | ||
| .cloned() | ||
| } { | ||
| state_root = layer.parent; | ||
| counter += 1; | ||
| if counter > commit_threshold { | ||
|
|
@@ -56,55 +71,68 @@ impl TrieLayerCache { | |
| None | ||
| } | ||
|
|
||
| pub fn put_batch( | ||
| &mut self, | ||
| parent: H256, | ||
| state_root: H256, | ||
| key_values: Vec<(Nibbles, Vec<u8>)>, | ||
| ) { | ||
| pub fn put_batch(&self, parent: H256, state_root: H256, key_values: Vec<(Nibbles, Vec<u8>)>) { | ||
| if parent == state_root && key_values.is_empty() { | ||
| return; | ||
| } else if parent == state_root { | ||
| tracing::error!("Inconsistent state: parent == state_root but key_values not empty"); | ||
| return; | ||
| } | ||
| self.layers | ||
|
|
||
| let entry = self | ||
| .layers | ||
| .lock() | ||
| .expect("lock poisoned") | ||
| .entry(state_root) | ||
| .or_insert_with(|| { | ||
| self.last_id += 1; | ||
| TrieLayer { | ||
| nodes: Arc::new(BTreeMap::new()), | ||
| let last_id = self.last_id.fetch_add(1, Ordering::Relaxed); | ||
| Arc::new(TrieLayer { | ||
| nodes: BTreeMap::new(), | ||
| parent, | ||
| id: self.last_id, | ||
| } | ||
| id: last_id + 1, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this +1 is to maintain the same ids used before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These kind of comments belong in the code. |
||
| }) | ||
| }) | ||
| .nodes | ||
| .extend( | ||
| key_values | ||
| .into_iter() | ||
| .map(|(path, node)| (path.into_vec(), node)), | ||
| ); | ||
| .clone(); | ||
|
|
||
| let mut layer = TrieLayer::clone(&entry); | ||
|
|
||
| layer.nodes.extend( | ||
| key_values | ||
| .into_iter() | ||
| .map(|(path, node)| (path.into_vec(), node)), | ||
| ); | ||
| self.layers | ||
| .lock() | ||
| .expect("lock poisoned") | ||
| .insert(state_root, entry); | ||
| } | ||
|
|
||
| pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> { | ||
| let mut layer = self.layers.remove(&state_root)?; | ||
| pub fn commit(&self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> { | ||
| let layer = self | ||
| .layers | ||
| .lock() | ||
| .expect("lock poisoned") | ||
| .remove(&state_root)?; | ||
| // ensure parents are commited | ||
| let parent_nodes = self.commit(layer.parent); | ||
| // older layers are useless | ||
| self.layers.retain(|_, item| item.id > layer.id); | ||
| self.layers | ||
| .lock() | ||
| .expect("lock poisoned") | ||
| .retain(|_, item| item.id > layer.id); | ||
| Some( | ||
| parent_nodes | ||
| .unwrap_or_default() | ||
| .into_iter() | ||
| .chain(layer.nodes.drain()) | ||
| .chain(layer.nodes.clone()) | ||
| .collect(), | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| pub struct TrieWrapper { | ||
| pub state_root: H256, | ||
| pub inner: Arc<RwLock<TrieLayerCache>>, | ||
| pub inner: Arc<TrieLayerCache>, | ||
| pub db: Box<dyn TrieDB>, | ||
| pub prefix: Option<H256>, | ||
| } | ||
|
|
@@ -127,12 +155,7 @@ impl TrieDB for TrieWrapper { | |
| } | ||
| fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> { | ||
| let key = apply_prefix(self.prefix, key); | ||
| if let Some(value) = self | ||
| .inner | ||
| .read() | ||
| .map_err(|_| TrieError::LockError)? | ||
| .get(self.state_root, key.clone()) | ||
| { | ||
| if let Some(value) = self.inner.get(self.state_root, key.clone()) { | ||
| return Ok(Some(value)); | ||
| } | ||
| self.db.get(key) | ||
|
|
@@ -148,8 +171,7 @@ impl TrieDB for TrieWrapper { | |
| } | ||
| None => *EMPTY_TRIE_HASH, | ||
| }; | ||
| let mut inner = self.inner.write().map_err(|_| TrieError::LockError)?; | ||
| inner.put_batch( | ||
| self.inner.put_batch( | ||
| self.state_root, | ||
| new_state_root, | ||
| key_values | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use global allows.