Skip to content

Commit

Permalink
enha: remove temporary fallback importer block and receipts fetch (#2005
Browse files Browse the repository at this point in the history
)

* enha: remove temporary importer flow

* chore: improve

* chore: improve

* chore: log

* chore: simplify test

* chore: improve

* chore: simplify

* chore: doc
  • Loading branch information
gabriel-aranha-cw authored Feb 7, 2025
1 parent 03a7a8d commit 05e6001
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 47 deletions.
37 changes: 37 additions & 0 deletions e2e/test/external/e2e-json-rpc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,43 @@ describe("JSON-RPC", () => {
expect(block).to.be.null;
});
});
describe("stratus_getBlockAndReceipts", () => {
it("fetches block with receipt", async () => {
await sendReset();

// Send a transaction
const amount = 1;
const nonce = await sendGetNonce(ALICE);
const signedTx = await ALICE.signWeiTransfer(BOB.address, amount, nonce);
const txHash = keccak256(signedTx);
await sendRawTransaction(signedTx);
await sendEvmMine();

// Get block number and hash
const blockNumber = await send("eth_blockNumber");
const block = await send("eth_getBlockByNumber", [blockNumber, true]);
const blockHash = block.hash;

// Get individual block and receipt
const individualBlock = await send("eth_getBlockByHash", [blockHash, true]);
const individualReceipt = await send("eth_getTransactionReceipt", [txHash, true]);

// Get block and receipts using stratus endpoint
const response = await send("stratus_getBlockAndReceipts", [blockHash]);

// Validate block
expect(response.block).to.not.be.null;
expect(response.block).to.deep.equal(individualBlock);

// Validate receipt
expect(response.receipts).to.have.length(1);
const combinedReceipt = response.receipts[0];
const safeIndividualReceipt = individualReceipt!;

// Compare receipt fields
expect(combinedReceipt).to.deep.equal(safeIndividualReceipt);
});
});
it("eth_getUncleByBlockHashAndIndex", async function () {
if (isStratus) {
(await sendExpect("eth_getUncleByBlockHashAndIndex", [ZERO, ZERO])).to.be.null;
Expand Down
80 changes: 36 additions & 44 deletions src/eth/follower/importer/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,9 @@ fn set_external_rpc_current_block(new_number: BlockNumber) {
/// Number of blocks that are downloaded in parallel.
const PARALLEL_BLOCKS: usize = 3;

/// Number of receipts that are downloaded in parallel.
const PARALLEL_RECEIPTS: usize = 100;

/// Timeout awaiting for newHeads event before fallback to polling.
const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000);

/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved.
const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50);

pub struct Importer {
executor: Arc<Executor>,

Expand Down Expand Up @@ -481,58 +475,56 @@ impl Importer {
// -----------------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------------

#[allow(clippy::expect_used)]
#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))]
async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> (ExternalBlock, Vec<ExternalReceipt>) {
const RETRY_DELAY: Duration = Duration::from_millis(10);
Span::with(|s| {
s.rec_str("block_number", &block_number);
});

async fn try_reading_block_and_receipts_with_temporary_endpoint(
chain: Arc<BlockchainClient>,
block_number: BlockNumber,
) -> Option<(ExternalBlock, Vec<ExternalReceipt>)> {
let mut json = chain.fetch_block_and_receipts_with_temporary_endpoint(block_number).await.ok()?;

let block = mem::take(json.get_mut("block")?);
let block: ExternalBlock = serde_json::from_value(block).ok()?;

let receipts = mem::take(json.get_mut("receipts")?);
let receipts: Vec<ExternalReceipt> = serde_json::from_value(receipts).ok()?;
loop {
tracing::info!(%block_number, "fetching block and receipts");

Some((block, receipts))
}
let mut json = match chain.fetch_block_and_receipts(block_number).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

if let Some(res) = try_reading_block_and_receipts_with_temporary_endpoint(Arc::clone(&chain), block_number).await {
tracing::info!("successfully imported block and receipts using endpoint stratus_getBlockAndReceipts");
return res;
} else {
tracing::warn!("failed to import block and receipts with endpoint stratus_getBlockAndReceipts, falling back to get block + get each receipt");
}
let block = match json.get_mut("block") {
Some(block) => mem::take(block),
None => {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "missing block in response, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

// fetch block
let block = fetch_block(Arc::clone(&chain), block_number).await;
if block.is_null() {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::SyncData).await;
continue;
}

// wait some time until receipts are available
let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await;
let block: ExternalBlock = serde_json::from_value(block).expect("cannot fail to deserialize external block");

// fetch receipts in parallel
let mut receipts_tasks = Vec::with_capacity(block.transactions.len());
let receipts = match json.get_mut("receipts") {
Some(receipts) => mem::take(receipts),
None => {
tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "missing receipts in response, retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

let tx_hashes = if let BlockTransactions::Full(txs) = &block.transactions {
txs.iter().map(|tx| tx.hash()).collect::<Vec<_>>()
} else {
tracing::error!("expected full transactions, got hashes or uncle");
return (block, Vec::new());
};
let receipts: Vec<ExternalReceipt> = serde_json::from_value(receipts).expect("cannot fail to deserialize external receipts");

for hash in tx_hashes {
receipts_tasks.push(fetch_receipt(Arc::clone(&chain), block_number, hash));
return (block, receipts);
}

let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await;

(block, receipts)
}

#[allow(clippy::expect_used)]
Expand Down
6 changes: 3 additions & 3 deletions src/infra/blockchain_client/blockchain_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ impl BlockchainClient {
}
}

/// Fetches a block by number.
pub async fn fetch_block_and_receipts_with_temporary_endpoint(&self, block_number: BlockNumber) -> anyhow::Result<JsonValue> {
/// Fetches a block by number with receipts.
pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result<JsonValue> {
tracing::debug!(%block_number, "fetching block");

let number = to_json_value(block_number);
match self.http.request::<JsonValue, _>("stratus_getBlockAndReceipts", [number]).await {
Ok(json) => Ok(json),
Err(e) => log_and_err!(reason = e, "failed to fetch block by number"),
Err(e) => log_and_err!(reason = e, "failed to fetch block with receipts"),
}
}

Expand Down

0 comments on commit 05e6001

Please sign in to comment.