Skip to content

Commit

Permalink
enha: cleanup importer block and receipt fetching (#2006)
Browse files Browse the repository at this point in the history
* feat: refactor block and receipt fetching with Alloy types

This commit introduces several improvements to block and receipt fetching:
- Add new `ExternalBlockWithReceipts` type to handle block and receipt retrieval
- Update `BlockchainClient` to return `Option` types for block and receipt fetching
- Simplify RPC downloader and importer block fetching logic
- Remove temporary deserialization structs
- Improve error handling and logging for block retrieval

* chore: lint
  • Loading branch information
gabriel-aranha-cw authored Feb 7, 2025
1 parent 05e6001 commit c97aede
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 121 deletions.
42 changes: 16 additions & 26 deletions src/bin/rpc_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;

use alloy_rpc_types_eth::BlockTransactions;
use anyhow::anyhow;
use anyhow::Context;
use futures::stream;
use futures::StreamExt;
use itertools::Itertools;
use serde::Deserialize;
use stratus::config::RpcDownloaderConfig;
use stratus::eth::external_rpc::ExternalRpc;
use stratus::eth::external_rpc::PostgresExternalRpc;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::Hash;
use stratus::ext::not;
use stratus::infra::BlockchainClient;
use stratus::utils::DropTimer;
Expand Down Expand Up @@ -144,27 +143,32 @@ async fn download(rpc_storage: Arc<PostgresExternalRpc>, chain: Arc<BlockchainCl
}

// retrieve block
let block_json = match chain.fetch_block(current).await {
Ok(json) => json,
let block = match chain.fetch_block(current).await {
Ok(Some(block)) => block,
Ok(None) => {
tracing::warn!(%current, "block not available yet, retrying");
continue;
}
Err(e) => {
tracing::warn!(reason = ?e, "failed to fetch, retrying block download");
continue;
}
};

// extract transaction hashes
let block: ImporterBlock = match ImporterBlock::deserialize(&block_json) {
Ok(block) => block,
Err(e) => {
tracing::error!(reason = ?e, block_number = %current, payload = ?block_json, "block does not match expected format");
return Err(e).context(format!("block does not match expected format for block {}", current));
let tx_hashes = match &block.transactions {
BlockTransactions::Full(txs) => txs.iter().map(|tx| tx.hash()).collect_vec(),
other => {
tracing::error!(%current, ?other, "unsupported transaction format");
return Err(anyhow!("unsupported transaction format in block {}", current));
}
};
let hashes = block.transactions.into_iter().map(|tx| tx.hash).collect_vec();

let block_json = serde_json::to_value(&block).context("failed to serialize block to JSON")?;

// retrieve receipts
let mut receipts_json = Vec::with_capacity(hashes.len());
for tx_hash in hashes {
let mut receipts_json = Vec::with_capacity(tx_hashes.len());
for tx_hash in tx_hashes {
loop {
let receipt = match chain.fetch_receipt(tx_hash).await {
Ok(receipt) => receipt,
Expand Down Expand Up @@ -229,17 +233,3 @@ fn blocks_per_minute_reporter() {
);
}
}

// -----------------------------------------------------------------------------
// Blockchain RPC structs
// -----------------------------------------------------------------------------

#[derive(serde::Deserialize)]
struct ImporterBlock {
transactions: Vec<ImporterTransaction>,
}

#[derive(serde::Deserialize)]
struct ImporterTransaction {
hash: Hash,
}
94 changes: 5 additions & 89 deletions src/eth/follower/importer/importer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::borrow::Cow;
use std::cmp::min;
use std::mem;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -10,7 +9,6 @@ use alloy_rpc_types_eth::BlockTransactions;
use anyhow::anyhow;
use futures::try_join;
use futures::StreamExt;
use serde::Deserialize;
use tokio::sync::mpsc;
use tokio::task::yield_now;
use tokio::time::timeout;
Expand All @@ -24,7 +22,6 @@ use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalReceipts;
use crate::eth::primitives::Hash;
use crate::eth::storage::StratusStorage;
use crate::ext::spawn_named;
use crate::ext::traced_sleep;
Expand Down Expand Up @@ -475,8 +472,6 @@ impl Importer {
// -----------------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------------
#[allow(clippy::expect_used)]
#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))]
async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> (ExternalBlock, Vec<ExternalReceipt>) {
const RETRY_DELAY: Duration = Duration::from_millis(10);
Span::with(|s| {
Expand All @@ -486,96 +481,17 @@ async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, block_number: Bl
loop {
tracing::info!(%block_number, "fetching block and receipts");

let mut json = match chain.fetch_block_and_receipts(block_number).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

let block = match json.get_mut("block") {
Some(block) => mem::take(block),
None => {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "missing block in response, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

if block.is_null() {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::SyncData).await;
continue;
}

let block: ExternalBlock = serde_json::from_value(block).expect("cannot fail to deserialize external block");

let receipts = match json.get_mut("receipts") {
Some(receipts) => mem::take(receipts),
None => {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "missing receipts in response, retrying with delay.");
match chain.fetch_block_and_receipts(block_number).await {
Ok(Some(response)) => return (response.block, response.receipts),
Ok(None) => {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block and receipts not available yet, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

let receipts: Vec<ExternalReceipt> = serde_json::from_value(receipts).expect("cannot fail to deserialize external receipts");

return (block, receipts);
}
}

#[allow(clippy::expect_used)]
#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(block_number))]
async fn fetch_block(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> ExternalBlock {
const RETRY_DELAY: Duration = Duration::from_millis(10);
Span::with(|s| {
s.rec_str("block_number", &block_number);
});

loop {
tracing::info!(%block_number, "fetching block");
let block = match chain.fetch_block(block_number).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, %block_number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay.");
tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

if block.is_null() {
tracing::warn!(%block_number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::SyncData).await;
continue;
}

return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block");
}
}

#[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(block_number, tx_hash))]
async fn fetch_receipt(chain: Arc<BlockchainClient>, block_number: BlockNumber, tx_hash: Hash) -> ExternalReceipt {
Span::with(|s| {
s.rec_str("block_number", &block_number);
s.rec_str("tx_hash", &tx_hash);
});

loop {
tracing::info!(%block_number, %tx_hash, "fetching receipt");

match chain.fetch_receipt(tx_hash).await {
Ok(Some(receipt)) => return receipt,
Ok(None) => {
tracing::warn!(%block_number, %tx_hash, "receipt not available yet because block is not mined. retrying now.");
continue;
}
Err(e) => {
tracing::error!(reason = ?e, %block_number, %tx_hash, "failed to fetch receipt. retrying now.");
}
}
}
}

Expand Down
27 changes: 27 additions & 0 deletions src/eth/primitives/external_block_with_receipts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use serde::Deserialize;

use crate::alias::JsonValue;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalReceipt;
use crate::log_and_err;

#[derive(Debug, Clone, Deserialize)]
pub struct ExternalBlockWithReceipts {
pub block: ExternalBlock,
pub receipts: Vec<ExternalReceipt>,
}

// -----------------------------------------------------------------------------
// Conversions: Other -> Self
// -----------------------------------------------------------------------------

impl TryFrom<JsonValue> for ExternalBlockWithReceipts {
type Error = anyhow::Error;

fn try_from(value: JsonValue) -> Result<Self, Self::Error> {
match ExternalBlockWithReceipts::deserialize(&value) {
Ok(v) => Ok(v),
Err(e) => log_and_err!(reason = e, payload = value, "failed to convert payload value to ExternalBlockWithReceipts"),
}
}
}
2 changes: 2 additions & 0 deletions src/eth/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod execution_metrics;
mod execution_result;
mod execution_value_change;
mod external_block;
mod external_block_with_receipts;
mod external_receipt;
mod external_receipts;
mod external_transaction;
Expand Down Expand Up @@ -73,6 +74,7 @@ pub use execution_metrics::EvmExecutionMetrics;
pub use execution_result::ExecutionResult;
pub use execution_value_change::ExecutionValueChange;
pub use external_block::ExternalBlock;
pub use external_block_with_receipts::ExternalBlockWithReceipts;
pub use external_receipt::ExternalReceipt;
pub use external_receipts::ExternalReceipts;
pub use external_transaction::ExternalTransaction;
Expand Down
20 changes: 14 additions & 6 deletions src/infra/blockchain_client/blockchain_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::alias::JsonValue;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalBlockWithReceipts;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::Hash;
use crate::eth::primitives::StratusError;
Expand Down Expand Up @@ -146,22 +147,30 @@ impl BlockchainClient {
}

/// Fetches a block by number with receipts.
pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result<JsonValue> {
pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result<Option<ExternalBlockWithReceipts>> {
tracing::debug!(%block_number, "fetching block");

let number = to_json_value(block_number);
match self.http.request::<JsonValue, _>("stratus_getBlockAndReceipts", [number]).await {
Ok(json) => Ok(json),
let result = self
.http
.request::<Option<ExternalBlockWithReceipts>, _>("stratus_getBlockAndReceipts", [number])
.await;

match result {
Ok(block) => Ok(block),
Err(e) => log_and_err!(reason = e, "failed to fetch block with receipts"),
}
}

/// Fetches a block by number.
pub async fn fetch_block(&self, block_number: BlockNumber) -> anyhow::Result<JsonValue> {
pub async fn fetch_block(&self, block_number: BlockNumber) -> anyhow::Result<Option<ExternalBlock>> {
tracing::debug!(%block_number, "fetching block");

let number = to_json_value(block_number);
let result = self.http.request::<JsonValue, _>("eth_getBlockByNumber", [number, JsonValue::Bool(true)]).await;
let result = self
.http
.request::<Option<ExternalBlock>, _>("eth_getBlockByNumber", [number, JsonValue::Bool(true)])
.await;

match result {
Ok(block) => Ok(block),
Expand All @@ -174,7 +183,6 @@ impl BlockchainClient {
tracing::debug!(%tx_hash, "fetching transaction");

let hash = to_json_value(tx_hash);

let result = self.http.request::<Option<AlloyTransaction>, _>("eth_getTransactionByHash", [hash]).await;

match result {
Expand Down

0 comments on commit c97aede

Please sign in to comment.