Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ethrex_trie::{InMemoryTrieDB, Nibbles, Trie, db::NodeMap};
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, Mutex, MutexGuard, RwLock},
sync::{Arc, Mutex, MutexGuard},
};

// NOTE: we use a different commit threshold than rocksdb since tests
Expand All @@ -39,7 +39,7 @@ pub struct StoreInner {
// Maps transaction hashes to their blocks (height+hash) and index within the blocks.
transaction_locations: HashMap<H256, Vec<(BlockNumber, BlockHash, Index)>>,
receipts: HashMap<BlockHash, HashMap<Index, Receipt>>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
trie_cache: Arc<TrieLayerCache>,
// Contains account trie nodes
state_trie_nodes: NodeMap,
pending_blocks: HashMap<BlockHash, Block>,
Expand Down Expand Up @@ -92,10 +92,6 @@ impl StoreEngine for Store {

// Store trie updates
{
let mut trie = store
.trie_cache
.write()
.map_err(|_| StoreError::LockError)?;
let parent = update_batch
.blocks
.first()
Expand All @@ -121,8 +117,11 @@ impl StoreEngine for Store {
.lock()
.map_err(|_| StoreError::LockError)?;

if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) {
let nodes = trie.commit(root).unwrap_or_default();
if let Some(root) = store
.trie_cache
.get_commitable(pre_state_root, COMMIT_THRESHOLD)
{
let nodes = store.trie_cache.commit(root).unwrap_or_default();
for (key, value) in nodes {
if value.is_empty() {
state_trie.remove(&key);
Expand All @@ -141,7 +140,9 @@ impl StoreEngine for Store {
})
.chain(update_batch.account_updates)
.collect();
trie.put_batch(pre_state_root, last_state_root, key_values);
store
.trie_cache
.put_batch(pre_state_root, last_state_root, key_values);
}

for block in update_batch.blocks {
Expand Down
31 changes: 10 additions & 21 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::type_complexity)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use global allows.


use crate::{
rlp::AccountCodeHashRLP,
trie_db::{
Expand All @@ -23,7 +25,7 @@ use std::{
mem::take,
path::Path,
sync::{
Arc, Mutex, RwLock,
Arc,
mpsc::{SyncSender, sync_channel},
},
};
Expand Down Expand Up @@ -127,7 +129,7 @@ enum FKVGeneratorControlMessage {
#[derive(Debug, Clone)]
pub struct Store {
db: Arc<DBWithThreadMode<MultiThreaded>>,
trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>,
trie_cache: Arc<TrieLayerCache>,
flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
trie_update_worker_tx: std::sync::mpsc::SyncSender<(
std::sync::mpsc::SyncSender<Result<(), StoreError>>,
Expand Down Expand Up @@ -669,22 +671,12 @@ impl Store {
.chain(account_updates)
.collect();
// Read-Copy-Update the trie cache with a new layer.
let trie = trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone();
let mut trie_mut = (&*trie).clone();
trie_mut.put_batch(parent_state_root, child_state_root, new_layer);
*trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);
trie_cache.put_batch(parent_state_root, child_state_root, new_layer);
// Update finished, signal block production.
notify.send(Ok(())).map_err(|_| StoreError::LockError)?;

// Phase 2: update disk layer.
let trie = trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone();
let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else {
let Some(root) = trie_cache.get_commitable(parent_state_root, COMMIT_THRESHOLD) else {
// Nothing to commit to disk, move on.
return Ok(());
};
Expand All @@ -693,14 +685,13 @@ impl Store {
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop);

// RCU to remove the bottom layer: update step needs to happen after disk layer is updated.
let mut trie_mut = (&*trie).clone();
let mut batch = WriteBatch::default();
let [cf_trie_nodes, cf_flatkeyvalue, cf_misc] =
open_cfs(db, [CF_TRIE_NODES, CF_FLATKEYVALUE, CF_MISC_VALUES])?;

let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default();
// Commit removes the bottom layer and returns it, this is the mutation step.
let nodes = trie_mut.commit(root).unwrap_or_default();
let nodes = trie_cache.commit(root).unwrap_or_default();
for (key, value) in nodes {
let is_leaf = key.len() == 65 || key.len() == 131;

Expand All @@ -719,8 +710,6 @@ impl Store {
}
}
db.write(batch)?;
// Phase 3: update diff layers with the removal of bottom layer.
*trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutation needs to happen at the end. Otherwise you lose the bottom layer BEFORE there is a backing disk layer with the data visible to users..

Ok(())
}
}
Expand Down Expand Up @@ -841,7 +830,7 @@ impl StoreEngine for Store {
// Wait for an updated top layer so every caller afterwards sees a consistent view.
// Specifically, the next block produced MUST see this upper layer.
rx.recv()
.map_err(|e| StoreError::Custom(format!("recv failed: {e}")))?;
.map_err(|e| StoreError::Custom(format!("recv failed: {e}")))??;
}
// After top-level is addded, we can make the rest of the changes visible.
db.write(batch)
Expand Down Expand Up @@ -1397,7 +1386,7 @@ impl StoreEngine for Store {
let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?);
let wrap_db = Box::new(TrieWrapper {
state_root,
inner: self.trie_cache.lock().unwrap().clone(),
inner: self.trie_cache.clone(),
db,
prefix: Some(hashed_address),
});
Expand All @@ -1409,7 +1398,7 @@ impl StoreEngine for Store {
let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?);
let wrap_db = Box::new(TrieWrapper {
state_root,
inner: self.trie_cache.lock().unwrap().clone(),
inner: self.trie_cache.clone(),
db,
prefix: None,
});
Expand Down
96 changes: 59 additions & 37 deletions crates/storage/trie_db/layering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use ethrex_common::H256;
use ethrex_rlp::decode::RLPDecode;
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
sync::{
Arc, Mutex,
atomic::{AtomicUsize, Ordering},
},
};

use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError};

#[derive(Debug)]
#[derive(Debug, Clone)]
struct TrieLayer {
nodes: BTreeMap<Vec<u8>, Vec<u8>>,
parent: H256,
Expand All @@ -18,14 +21,20 @@ struct TrieLayer {
pub struct TrieLayerCache {
/// Monotonically increasing ID for layers, starting at 1.
/// TODO: this implementation panics on overflow
last_id: usize,
layers: BTreeMap<H256, Arc<TrieLayer>>,
last_id: Arc<AtomicUsize>,
layers: Arc<Mutex<BTreeMap<H256, Arc<TrieLayer>>>>,
Copy link
Contributor Author

@edg-l edg-l Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the mutex surrounding the btreemap is worth it (as is done now), since its only used to do the get and clone ASAP to unlock again. using an Arc<Mutex<Arc>> would be too complex IMHO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one forces the tree traversal to be done with the mutex taken. We want to avoid locking in the read path, that's why we would rather have a single outer lock and an Arc clone there.

}

impl TrieLayerCache {
pub fn get(&self, state_root: H256, key: Nibbles) -> Option<Vec<u8>> {
let mut current_state_root = state_root;
while let Some(layer) = self.layers.get(&current_state_root) {
while let Some(layer) = {
self.layers
.lock()
.expect("lock poisoned")
.get(&current_state_root)
.cloned()
} {
if let Some(value) = layer.nodes.get(key.as_ref()) {
return Some(value.clone());
}
Expand All @@ -46,7 +55,13 @@ impl TrieLayerCache {
// TODO: use finalized hash to know when to commit
pub fn get_commitable(&self, mut state_root: H256, commit_threshold: usize) -> Option<H256> {
let mut counter = 0;
while let Some(layer) = self.layers.get(&state_root) {
while let Some(layer) = {
self.layers
.lock()
.expect("lock poisoned")
.get(&state_root)
.cloned()
} {
state_root = layer.parent;
counter += 1;
if counter > commit_threshold {
Expand All @@ -56,55 +71,68 @@ impl TrieLayerCache {
None
}

pub fn put_batch(
&mut self,
parent: H256,
state_root: H256,
key_values: Vec<(Nibbles, Vec<u8>)>,
) {
pub fn put_batch(&self, parent: H256, state_root: H256, key_values: Vec<(Nibbles, Vec<u8>)>) {
if parent == state_root && key_values.is_empty() {
return;
} else if parent == state_root {
tracing::error!("Inconsistent state: parent == state_root but key_values not empty");
return;
}
self.layers

let entry = self
.layers
.lock()
.expect("lock poisoned")
.entry(state_root)
.or_insert_with(|| {
self.last_id += 1;
TrieLayer {
nodes: Arc::new(BTreeMap::new()),
let last_id = self.last_id.fetch_add(1, Ordering::Relaxed);
Arc::new(TrieLayer {
nodes: BTreeMap::new(),
parent,
id: self.last_id,
}
id: last_id + 1,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this +1 is to maintain the same ids used before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind of comments belong in the code.

})
})
.nodes
.extend(
key_values
.into_iter()
.map(|(path, node)| (path.into_vec(), node)),
);
.clone();

let mut layer = TrieLayer::clone(&entry);

layer.nodes.extend(
key_values
.into_iter()
.map(|(path, node)| (path.into_vec(), node)),
);
self.layers
.lock()
.expect("lock poisoned")
.insert(state_root, entry);
}

pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
let mut layer = self.layers.remove(&state_root)?;
pub fn commit(&self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
let layer = self
.layers
.lock()
.expect("lock poisoned")
.remove(&state_root)?;
// ensure parents are commited
let parent_nodes = self.commit(layer.parent);
// older layers are useless
self.layers.retain(|_, item| item.id > layer.id);
self.layers
.lock()
.expect("lock poisoned")
.retain(|_, item| item.id > layer.id);
Some(
parent_nodes
.unwrap_or_default()
.into_iter()
.chain(layer.nodes.drain())
.chain(layer.nodes.clone())
.collect(),
)
}
}

pub struct TrieWrapper {
pub state_root: H256,
pub inner: Arc<RwLock<TrieLayerCache>>,
pub inner: Arc<TrieLayerCache>,
pub db: Box<dyn TrieDB>,
pub prefix: Option<H256>,
}
Expand All @@ -127,12 +155,7 @@ impl TrieDB for TrieWrapper {
}
fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
let key = apply_prefix(self.prefix, key);
if let Some(value) = self
.inner
.read()
.map_err(|_| TrieError::LockError)?
.get(self.state_root, key.clone())
{
if let Some(value) = self.inner.get(self.state_root, key.clone()) {
return Ok(Some(value));
}
self.db.get(key)
Expand All @@ -148,8 +171,7 @@ impl TrieDB for TrieWrapper {
}
None => *EMPTY_TRIE_HASH,
};
let mut inner = self.inner.write().map_err(|_| TrieError::LockError)?;
inner.put_batch(
self.inner.put_batch(
self.state_root,
new_state_root,
key_values
Expand Down
Loading