Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde_json = "1.0.117"
rocksdb = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["rt"] }
bincode = "1.3.3"
arc-swap = "1.7.1"

[features]
default = []
Expand Down
11 changes: 5 additions & 6 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
store::STATE_TRIE_SEGMENTS,
trie_db::layering::{TrieLayerCache, TrieWrapper},
};
use arc_swap::ArcSwap;
use bytes::Bytes;
use ethereum_types::H256;
use ethrex_common::types::{
Expand All @@ -15,7 +16,7 @@ use ethrex_trie::{InMemoryTrieDB, Nibbles, Trie, db::NodeMap};
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, Mutex, MutexGuard, RwLock},
sync::{Arc, Mutex, MutexGuard},
};

// NOTE: we use a different commit threshold than rocksdb since tests
Expand All @@ -39,7 +40,7 @@ pub struct StoreInner {
// Maps transaction hashes to their blocks (height+hash) and index within the blocks.
transaction_locations: HashMap<H256, Vec<(BlockNumber, BlockHash, Index)>>,
receipts: HashMap<BlockHash, HashMap<Index, Receipt>>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
trie_cache: Arc<ArcSwap<TrieLayerCache>>,
// Contains account trie nodes
state_trie_nodes: NodeMap,
pending_blocks: HashMap<BlockHash, Block>,
Expand Down Expand Up @@ -92,10 +93,7 @@ impl StoreEngine for Store {

// Store trie updates
{
let mut trie = store
.trie_cache
.write()
.map_err(|_| StoreError::LockError)?;
let mut trie = TrieLayerCache::clone(&store.trie_cache.load());
let parent = update_batch
.blocks
.first()
Expand Down Expand Up @@ -142,6 +140,7 @@ impl StoreEngine for Store {
.chain(update_batch.account_updates)
.collect();
trie.put_batch(pre_state_root, last_state_root, key_values);
store.trie_cache.store(Arc::new(trie));
}

for block in update_batch.blocks {
Expand Down
14 changes: 7 additions & 7 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
rocksdb_locked::RocksDBLockedTrieDB,
},
};
use arc_swap::ArcSwap;
use bytes::Bytes;
use ethrex_common::{
H256,
Expand All @@ -18,11 +19,7 @@ use rocksdb::{
BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded,
Options, WriteBatch,
};
use std::{
collections::HashSet,
path::Path,
sync::{Arc, RwLock},
};
use std::{collections::HashSet, path::Path, sync::Arc};
use tracing::{debug, error, info};

use crate::{
Expand Down Expand Up @@ -123,7 +120,7 @@ enum FKVGeneratorControlMessage {
#[derive(Debug, Clone)]
pub struct Store {
db: Arc<DBWithThreadMode<MultiThreaded>>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
trie_cache: Arc<ArcSwap<TrieLayerCache>>,
flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
}

Expand Down Expand Up @@ -663,7 +660,8 @@ impl StoreEngine for Store {

let mut updated_trie = false;

let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?;
let mut trie = TrieLayerCache::clone(&trie_cache.load());

if let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) {
updated_trie = true;
// If the channel is closed, there's nobody to notify
Expand Down Expand Up @@ -704,6 +702,8 @@ impl StoreEngine for Store {
.collect(),
);

trie_cache.store(Arc::new(trie));

for block in update_batch.blocks {
let block_number = block.header.number;
let block_hash = block.hash();
Expand Down
31 changes: 14 additions & 17 deletions crates/storage/trie_db/layering.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use arc_swap::ArcSwap;
use ethrex_common::H256;
use ethrex_rlp::decode::RLPDecode;
use std::{collections::HashMap, sync::Arc, sync::RwLock};
use std::{collections::HashMap, sync::Arc};

use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError};

#[derive(Debug)]
#[derive(Debug, Clone)]
struct TrieLayer {
nodes: HashMap<Vec<u8>, Vec<u8>>,
parent: H256,
id: usize,
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct TrieLayerCache {
/// Monotonically increasing ID for layers, starting at 1.
/// TODO: this implementation panics on overflow
Expand Down Expand Up @@ -41,11 +42,7 @@ impl TrieLayerCache {
}

// TODO: use finalized hash to know when to commit
pub fn get_commitable(
&mut self,
mut state_root: H256,
commit_threshold: usize,
) -> Option<H256> {
pub fn get_commitable(&self, mut state_root: H256, commit_threshold: usize) -> Option<H256> {
let mut counter = 0;
while let Some(layer) = self.layers.get(&state_root) {
state_root = layer.parent;
Expand Down Expand Up @@ -105,7 +102,7 @@ impl TrieLayerCache {

pub struct TrieWrapper {
pub state_root: H256,
pub inner: Arc<RwLock<TrieLayerCache>>,
pub inner: Arc<ArcSwap<TrieLayerCache>>,
pub db: Box<dyn TrieDB>,
pub prefix: Option<H256>,
}
Expand All @@ -128,28 +125,27 @@ impl TrieDB for TrieWrapper {
}
fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
let key = apply_prefix(self.prefix, key);
if let Some(value) = self
.inner
.read()
.map_err(|_| TrieError::LockError)?
.get(self.state_root, key.clone())
{
if let Some(value) = self.inner.load().get(self.state_root, key.clone()) {
return Ok(Some(value));
}
self.db.get(key)
}

fn put_batch(&self, key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
// TODO: this is unused, because we call `TrieLayerCache::put_batch` directly
let last_pair = key_values.iter().rev().find(|(_path, rlp)| !rlp.is_empty());
let last_pair = key_values
.iter()
.rev()
.find(|(_path, rlp)| !rlp.is_empty());
let new_state_root = match last_pair {
Some((_, noderlp)) => {
let root_node = Node::decode(noderlp)?;
root_node.compute_hash().finalize()
}
None => *EMPTY_TRIE_HASH,
};
let mut inner = self.inner.write().map_err(|_| TrieError::LockError)?;

let mut inner = TrieLayerCache::clone(&self.inner.load());
inner.put_batch(
self.state_root,
new_state_root,
Expand All @@ -158,6 +154,7 @@ impl TrieDB for TrieWrapper {
.map(move |(path, node)| (apply_prefix(self.prefix, path), node))
.collect(),
);
self.inner.store(Arc::new(inner));
Ok(())
}
}
Loading