diff --git a/src/bin/rpc_downloader.rs b/src/bin/rpc_downloader.rs index 2f4aff3e7..83de332c3 100644 --- a/src/bin/rpc_downloader.rs +++ b/src/bin/rpc_downloader.rs @@ -5,18 +5,17 @@ use std::sync::Arc; use std::thread; use std::time::Duration; +use alloy_rpc_types_eth::BlockTransactions; use anyhow::anyhow; use anyhow::Context; use futures::stream; use futures::StreamExt; use itertools::Itertools; -use serde::Deserialize; use stratus::config::RpcDownloaderConfig; use stratus::eth::external_rpc::ExternalRpc; use stratus::eth::external_rpc::PostgresExternalRpc; use stratus::eth::primitives::Address; use stratus::eth::primitives::BlockNumber; -use stratus::eth::primitives::Hash; use stratus::ext::not; use stratus::infra::BlockchainClient; use stratus::utils::DropTimer; @@ -144,8 +143,12 @@ async fn download(rpc_storage: Arc, chain: Arc json, + let block = match chain.fetch_block(current).await { + Ok(Some(block)) => block, + Ok(None) => { + tracing::warn!(%current, "block not available yet, retrying"); + continue; + } Err(e) => { tracing::warn!(reason = ?e, "failed to fetch, retrying block download"); continue; @@ -153,18 +156,19 @@ async fn download(rpc_storage: Arc, chain: Arc block, - Err(e) => { - tracing::error!(reason = ?e, block_number = %current, payload = ?block_json, "block does not match expected format"); - return Err(e).context(format!("block does not match expected format for block {}", current)); + let tx_hashes = match &block.transactions { + BlockTransactions::Full(txs) => txs.iter().map(|tx| tx.hash()).collect_vec(), + other => { + tracing::error!(%current, ?other, "unsupported transaction format"); + return Err(anyhow!("unsupported transaction format in block {}", current)); } }; - let hashes = block.transactions.into_iter().map(|tx| tx.hash).collect_vec(); + + let block_json = serde_json::to_value(&block).context("failed to serialize block to JSON")?; // retrieve receipts - let mut receipts_json = Vec::with_capacity(hashes.len()); - for tx_hash in hashes { + let mut receipts_json = Vec::with_capacity(tx_hashes.len()); + for tx_hash in tx_hashes { loop { let receipt = match chain.fetch_receipt(tx_hash).await { Ok(receipt) => receipt, @@ -229,17 +233,3 @@ fn blocks_per_minute_reporter() { ); } } - -// ----------------------------------------------------------------------------- -// Blockchain RPC structs -// ----------------------------------------------------------------------------- - -#[derive(serde::Deserialize)] -struct ImporterBlock { - transactions: Vec, -} - -#[derive(serde::Deserialize)] -struct ImporterTransaction { - hash: Hash, -} diff --git a/src/eth/follower/importer/importer.rs b/src/eth/follower/importer/importer.rs index b4d2c8149..14b6f6fae 100644 --- a/src/eth/follower/importer/importer.rs +++ b/src/eth/follower/importer/importer.rs @@ -1,6 +1,5 @@ use std::borrow::Cow; use std::cmp::min; -use std::mem; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -10,7 +9,6 @@ use alloy_rpc_types_eth::BlockTransactions; use anyhow::anyhow; use futures::try_join; use futures::StreamExt; -use serde::Deserialize; use tokio::sync::mpsc; use tokio::task::yield_now; use tokio::time::timeout; @@ -24,7 +22,6 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExternalBlock; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::ExternalReceipts; -use crate::eth::primitives::Hash; use crate::eth::storage::StratusStorage; use crate::ext::spawn_named; use crate::ext::traced_sleep; @@ -475,8 +472,6 @@ impl Importer { // ----------------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------------- -#[allow(clippy::expect_used)] -#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))] async fn fetch_block_and_receipts(chain: Arc, block_number: BlockNumber) -> (ExternalBlock, Vec) { const RETRY_DELAY: Duration = Duration::from_millis(10); Span::with(|s| { @@ -486,96 +481,17 @@ async fn fetch_block_and_receipts(chain: Arc, block_number: Bl loop { tracing::info!(%block_number, "fetching block and receipts"); - let mut json = match chain.fetch_block_and_receipts(block_number).await { - Ok(json) => json, - Err(e) => { - tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; - continue; - } - }; - - let block = match json.get_mut("block") { - Some(block) => mem::take(block), - None => { - tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "missing block in response, retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; - continue; - } - }; - - if block.is_null() { - tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; - continue; - } - - let block: ExternalBlock = serde_json::from_value(block).expect("cannot fail to deserialize external block"); - - let receipts = match json.get_mut("receipts") { - Some(receipts) => mem::take(receipts), - None => { - tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "missing receipts in response, retrying with delay."); + match chain.fetch_block_and_receipts(block_number).await { + Ok(Some(response)) => return (response.block, response.receipts), + Ok(None) => { + tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block and receipts not available yet, retrying with delay."); traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; - continue; } - }; - - let receipts: Vec = serde_json::from_value(receipts).expect("cannot fail to deserialize external receipts"); - - return (block, receipts); - } -} - -#[allow(clippy::expect_used)] -#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(block_number))] -async fn fetch_block(chain: Arc, block_number: BlockNumber) -> ExternalBlock { - const RETRY_DELAY: Duration = Duration::from_millis(10); - Span::with(|s| { - s.rec_str("block_number", &block_number); - }); - - loop { - tracing::info!(%block_number, "fetching block"); - let block = match chain.fetch_block(block_number).await { - Ok(json) => json, Err(e) => { - tracing::warn!(reason = ?e, %block_number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay."); + tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay."); traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; - continue; } }; - - if block.is_null() { - tracing::warn!(%block_number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; - continue; - } - - return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); - } -} - -#[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(block_number, tx_hash))] -async fn fetch_receipt(chain: Arc, block_number: BlockNumber, tx_hash: Hash) -> ExternalReceipt { - Span::with(|s| { - s.rec_str("block_number", &block_number); - s.rec_str("tx_hash", &tx_hash); - }); - - loop { - tracing::info!(%block_number, %tx_hash, "fetching receipt"); - - match chain.fetch_receipt(tx_hash).await { - Ok(Some(receipt)) => return receipt, - Ok(None) => { - tracing::warn!(%block_number, %tx_hash, "receipt not available yet because block is not mined. retrying now."); - continue; - } - Err(e) => { - tracing::error!(reason = ?e, %block_number, %tx_hash, "failed to fetch receipt. retrying now."); - } - } } } diff --git a/src/eth/primitives/external_block_with_receipts.rs b/src/eth/primitives/external_block_with_receipts.rs new file mode 100644 index 000000000..adfee6540 --- /dev/null +++ b/src/eth/primitives/external_block_with_receipts.rs @@ -0,0 +1,27 @@ +use serde::Deserialize; + +use crate::alias::JsonValue; +use crate::eth::primitives::ExternalBlock; +use crate::eth::primitives::ExternalReceipt; +use crate::log_and_err; + +#[derive(Debug, Clone, Deserialize)] +pub struct ExternalBlockWithReceipts { + pub block: ExternalBlock, + pub receipts: Vec, +} + +// ----------------------------------------------------------------------------- +// Conversions: Other -> Self +// ----------------------------------------------------------------------------- + +impl TryFrom for ExternalBlockWithReceipts { + type Error = anyhow::Error; + + fn try_from(value: JsonValue) -> Result { + match ExternalBlockWithReceipts::deserialize(&value) { + Ok(v) => Ok(v), + Err(e) => log_and_err!(reason = e, payload = value, "failed to convert payload value to ExternalBlockWithReceipts"), + } + } +} diff --git a/src/eth/primitives/mod.rs b/src/eth/primitives/mod.rs index 56c20efb7..a47e4f5a1 100644 --- a/src/eth/primitives/mod.rs +++ b/src/eth/primitives/mod.rs @@ -18,6 +18,7 @@ mod execution_metrics; mod execution_result; mod execution_value_change; mod external_block; +mod external_block_with_receipts; mod external_receipt; mod external_receipts; mod external_transaction; @@ -73,6 +74,7 @@ pub use execution_metrics::EvmExecutionMetrics; pub use execution_result::ExecutionResult; pub use execution_value_change::ExecutionValueChange; pub use external_block::ExternalBlock; +pub use external_block_with_receipts::ExternalBlockWithReceipts; pub use external_receipt::ExternalReceipt; pub use external_receipts::ExternalReceipts; pub use external_transaction::ExternalTransaction; diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 9986c29ca..3176e5a00 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -18,6 +18,7 @@ use crate::alias::JsonValue; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExternalBlock; +use crate::eth::primitives::ExternalBlockWithReceipts; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::StratusError; @@ -146,22 +147,30 @@ impl BlockchainClient { } /// Fetches a block by number with receipts. - pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result { + pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result> { tracing::debug!(%block_number, "fetching block"); let number = to_json_value(block_number); - match self.http.request::("stratus_getBlockAndReceipts", [number]).await { - Ok(json) => Ok(json), + let result = self + .http + .request::, _>("stratus_getBlockAndReceipts", [number]) + .await; + + match result { + Ok(block) => Ok(block), Err(e) => log_and_err!(reason = e, "failed to fetch block with receipts"), } } /// Fetches a block by number. - pub async fn fetch_block(&self, block_number: BlockNumber) -> anyhow::Result { + pub async fn fetch_block(&self, block_number: BlockNumber) -> anyhow::Result> { tracing::debug!(%block_number, "fetching block"); let number = to_json_value(block_number); - let result = self.http.request::("eth_getBlockByNumber", [number, JsonValue::Bool(true)]).await; + let result = self + .http + .request::, _>("eth_getBlockByNumber", [number, JsonValue::Bool(true)]) + .await; match result { Ok(block) => Ok(block), @@ -174,7 +183,6 @@ impl BlockchainClient { tracing::debug!(%tx_hash, "fetching transaction"); let hash = to_json_value(tx_hash); - let result = self.http.request::, _>("eth_getTransactionByHash", [hash]).await; match result {