Skip to content

Commit

Permalink
working version
Browse files Browse the repository at this point in the history
  • Loading branch information
cody-wang-cb committed Feb 25, 2025
1 parent ae16127 commit 90a418f
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 43 deletions.
97 changes: 82 additions & 15 deletions src/flashblocks.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::cache::Cache;
use alloy_primitives::map::foldhash::HashMap;
use alloy_primitives::{map::foldhash::HashMap, Bytes};
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use futures_util::StreamExt;
use reth_optimism_primitives::{OpBlock, OpReceipt};
use reth::core::primitives::SignedTransaction;
use reth_optimism_primitives::{OpBlock, OpReceipt, OpTransactionSigned};
use rollup_boost::{ExecutionPayloadBaseV1, FlashblocksPayloadV1};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -128,13 +129,12 @@ impl FlashblocksClient {
let msg_processing_start_time = Instant::now();
let metadata: Metadata = serde_json::from_value(payload.metadata)
.expect("failed to deserialize metadata");
let receipts = metadata.receipts;
let new_account_balances = metadata.new_account_balances;
let block_number = metadata.block_number;
let diff = payload.diff;
let withdrawals = diff.withdrawals.clone();

// Skip if index is 0 and base is not cached, likely the first payload
// Can't do pending block with this because already missing blocks
if payload.index != 0
&& cache_clone
.get::<ExecutionPayloadBaseV1>(&format!("base:{:?}", block_number))
Expand All @@ -143,6 +143,7 @@ impl FlashblocksClient {
continue;
}

// base only appears once in the first payload index
let base = if let Some(base) = payload.base {
cache_clone
.set(&format!("base:{:?}", block_number), &base, Some(10))
Expand All @@ -154,6 +155,46 @@ impl FlashblocksClient {
.expect("failed to get base from cache")
};

// update incremental transactions
let transactions = if payload.index == 0 {
diff.transactions.clone()
} else {
// Append new transactions to existing ones for incremental updates
let existing = cache_clone
.get::<Vec<Bytes>>(&format!("diff:transactions:{:?}", block_number))
.expect("failed to get pending transactions from cache");
diff.transactions.iter().cloned().chain(existing).collect()
};
cache_clone
.set(
&format!("diff:transactions:{:?}", block_number),
&transactions,
Some(10),
)
.expect("failed to set pending transactions in cache");

// update incremental receipts
let receipts: Vec<OpReceipt> = if payload.index == 0 {
metadata.receipts.values().cloned().collect()
} else {
let existing = cache_clone
.get::<Vec<OpReceipt>>(&format!("diff:receipts:{:?}", block_number))
.expect("failed to get pending receipts from cache");
metadata
.receipts
.values()
.cloned()
.chain(existing)
.collect()
};
cache_clone
.set(
&format!("diff:receipts:{:?}", block_number),
&receipts,
Some(10),
)
.expect("failed to set pending receipts in cache");

let execution_payload: ExecutionPayloadV3 = ExecutionPayloadV3 {
blob_gas_used: 0,
excess_blob_gas: 0,
Expand All @@ -173,7 +214,7 @@ impl FlashblocksClient {
extra_data: base.extra_data,
base_fee_per_gas: base.base_fee_per_gas,
block_hash: diff.block_hash,
transactions: diff.transactions.clone(),
transactions: transactions,
},
},
};
Expand All @@ -186,21 +227,41 @@ impl FlashblocksClient {
.set("pending", &block, Some(10))
.expect("failed to set block in cache");

// Store receipts
let all_receipts = receipts.values().cloned().collect::<Vec<_>>();
cache_clone
.set("pending_receipts", &all_receipts, Some(10))
.expect("failed to set receipts in cache");
// Store receipts, check if same value, if not update
let existing_receipts =
cache_clone.get::<Vec<OpReceipt>>("pending_receipts");
if existing_receipts.is_none() || existing_receipts.unwrap() != receipts {
cache_clone
.set("pending_receipts", &receipts, Some(10))
.expect("failed to set receipts in cache");
}

// Store tx receipts
for (tx_hash, receipt) in receipts.iter() {
cache_clone
.set(&format!("receipt:{:?}", tx_hash), receipt, Some(10))
.expect("failed to set receipt in cache");
for (tx_hash, receipt) in metadata.receipts.iter() {
// check if exists, if not update
let existing_receipt =
cache_clone.get::<OpReceipt>(&format!("receipt:{:?}", tx_hash));
if existing_receipt.is_none() {
cache_clone
.set(&format!("receipt:{:?}", tx_hash), receipt, Some(10))
.expect("failed to set receipt in cache");
}
}

// Store tx transaction signed
for transaction in block.body.transactions {
// check if exists, if not update
let existing_tx = cache_clone
.get::<OpTransactionSigned>(&transaction.tx_hash().to_string());
if existing_tx.is_none() {
cache_clone
.set(&transaction.tx_hash().to_string(), &transaction, Some(10))
.expect("failed to set tx in cache");
}
}

// Store account balances
for (address, balance) in new_account_balances.iter() {
for (address, balance) in metadata.new_account_balances.iter() {
cache_clone
.set(&format!("{:?}", address), &balance, Some(10))
.expect("failed to set account balance in cache");
Expand All @@ -209,6 +270,12 @@ impl FlashblocksClient {
metrics
.block_processing_duration
.record(msg_processing_start_time.elapsed());

// check duration
println!(
"block processing time: {:?}",
msg_processing_start_time.elapsed()
);
}
}
}
Expand Down
66 changes: 38 additions & 28 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use op_alloy_consensus::OpTxEnvelope;
use op_alloy_network::Optimism;
use op_alloy_rpc_types::Transaction;
use reth::{api::BlockBody, core::primitives::SignedTransaction, providers::HeaderProvider};
use reth_optimism_chainspec::{OpChainSpec, OP_SEPOLIA};
use reth_optimism_chainspec::{OpChainSpec, BASE_SEPOLIA};
use reth_optimism_primitives::{OpBlock, OpReceipt, OpTransactionSigned};
use reth_optimism_rpc::OpReceiptBuilder;
use reth_rpc_eth_api::RpcReceipt;
Expand Down Expand Up @@ -63,32 +63,42 @@ impl<E> EthApiExt<E> {
Self { eth_api, cache }
}

pub fn transform_block(&self, block: OpBlock) -> RpcBlock<Optimism> {
pub fn transform_block(&self, block: OpBlock, full: bool) -> RpcBlock<Optimism> {
let header: alloy_consensus::Header = block.header.clone();
let transactions = block.body.transactions.to_vec();
let transactions_with_senders = transactions
.into_iter()
.zip(block.body.recover_signers().unwrap());
let converted_txs = transactions_with_senders
.enumerate()
.map(|(idx, (tx, sender))| {
let signed_tx_ec_recovered = Recovered::new_unchecked(tx.clone(), sender);
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: None,
block_number: Some(block.number),
index: Some(idx as u64),
base_fee: None,
};
self.transform_tx(signed_tx_ec_recovered, tx_info)
})
.collect();

RpcBlock::<Optimism> {
header: Header::from_consensus(header.seal_slow(), None, None),
transactions: BlockTransactions::Full(converted_txs),
uncles: Vec::new(),
withdrawals: None,
if full {
let transactions_with_senders = transactions
.into_iter()
.zip(block.body.recover_signers().unwrap());
let converted_txs = transactions_with_senders
.enumerate()
.map(|(idx, (tx, sender))| {
let signed_tx_ec_recovered = Recovered::new_unchecked(tx.clone(), sender);
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: None,
block_number: Some(block.number),
index: Some(idx as u64),
base_fee: None,
};
self.transform_tx(signed_tx_ec_recovered, tx_info)
})
.collect();
RpcBlock::<Optimism> {
header: Header::from_consensus(header.seal_slow(), None, None),
transactions: BlockTransactions::Full(converted_txs),
uncles: Vec::new(),
withdrawals: None,
}
} else {
let tx_hashes = transactions.into_iter().map(|tx| *tx.tx_hash()).collect();
RpcBlock::<Optimism> {
header: Header::from_consensus(header.seal_slow(), None, None),
transactions: BlockTransactions::Hashes(tx_hashes),
uncles: Vec::new(),
withdrawals: None,
}
}
}

Expand All @@ -106,7 +116,7 @@ impl<E> EthApiExt<E> {
if inner.is_deposit() {
let receipt = self
.cache
.get::<OpReceipt>(&format!("receipt:{:?}", tx_info.hash))
.get::<OpReceipt>(&format!("receipt:{:?}", tx_info.hash.unwrap().to_string()))
.unwrap();
if let OpReceipt::Deposit(receipt) = receipt {
deposit_receipt_version = receipt.deposit_receipt_version;
Expand Down Expand Up @@ -207,7 +217,7 @@ where
BlockNumberOrTag::Pending => {
info!("pending block by number, delegating to flashblocks");
if let Some(block) = self.cache.get::<OpBlock>(&number.to_string()) {
return Ok(Some(self.transform_block(block)));
return Ok(Some(self.transform_block(block, _full)));
} else {
return Ok(None);
}
Expand All @@ -226,12 +236,12 @@ where
) -> RpcResult<Option<RpcReceipt<Optimism>>> {
if let Some(receipt) = self
.cache
.get::<OpReceipt>(&format!("receipt:{:?}", tx_hash))
.get::<OpReceipt>(&format!("receipt:{:?}", tx_hash.to_string()))
{
return Ok(Some(self.transform_receipt(
receipt,
tx_hash,
OP_SEPOLIA.as_ref(), // placeholder
BASE_SEPOLIA.as_ref(), // hardcoded for now
)));
}
info!("no receipt found in cache, using standard flow");
Expand Down

0 comments on commit 90a418f

Please sign in to comment.