From 96c7cc3d5401cec8bae80d9f0efa13fad2e9c800 Mon Sep 17 00:00:00 2001 From: mertwole Date: Mon, 2 Sep 2024 07:37:18 +0000 Subject: [PATCH] Split in the other way --- relayer/src/message_relayer/common/era.rs | 281 +++++++++++++++++ .../common/era_tx_submitter.rs | 272 ---------------- relayer/src/message_relayer/common/mod.rs | 2 +- .../src/message_relayer/message_processor.rs | 297 ++---------------- 4 files changed, 302 insertions(+), 550 deletions(-) create mode 100644 relayer/src/message_relayer/common/era.rs delete mode 100644 relayer/src/message_relayer/common/era_tx_submitter.rs diff --git a/relayer/src/message_relayer/common/era.rs b/relayer/src/message_relayer/common/era.rs new file mode 100644 index 00000000..41a6f2c6 --- /dev/null +++ b/relayer/src/message_relayer/common/era.rs @@ -0,0 +1,281 @@ +use keccak_hash::keccak_256; +use std::collections::{btree_map::Entry, BTreeMap}; + +use ethereum_client::{EthApi, TxHash, TxStatus}; +use gear_rpc_client::{dto::Message, GearApi}; +use primitive_types::H256; +use prometheus::IntCounter; + +use utils_prometheus::impl_metered_service; + +use crate::message_relayer::MessageInBlock; + +use super::merkle_root_listener::RelayedMerkleRoot; + +type BlockNumber = u32; + +pub struct Era { + latest_merkle_root: Option, + messages: BTreeMap>, + pending_txs: Vec, + + metrics: EraMetrics, +} + +impl_metered_service! { + pub struct EraMetrics { + total_submitted_txs: IntCounter, + total_failed_txs: IntCounter, + total_failed_txs_because_processed: IntCounter, + } +} + +impl EraMetrics { + pub fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + total_submitted_txs: IntCounter::new( + "message_relayer_message_processor_total_submitted_txs", + "Total amount of txs sent to ethereum", + )?, + total_failed_txs: IntCounter::new( + "message_relayer_message_processor_total_failed_txs", + "Total amount of txs sent to ethereum and failed", + )?, + total_failed_txs_because_processed: IntCounter::new( + "message_relayer_message_processor_total_failed_txs_because_processed", + "Amount of txs sent to ethereum and failed because they've already bee processed", + )?, + }) + } +} + +struct RelayMessagePendingTx { + hash: TxHash, + message_block: u32, + message: Message, +} + +impl Era { + pub fn new(metrics: EraMetrics) -> Self { + Self { + latest_merkle_root: None, + messages: BTreeMap::new(), + pending_txs: vec![], + + metrics, + } + } + + pub fn push_message(&mut self, message: MessageInBlock) { + match self.messages.entry(message.block) { + Entry::Occupied(mut entry) => { + entry.get_mut().push(message.message); + } + Entry::Vacant(entry) => { + entry.insert(vec![message.message]); + } + } + } + + pub fn push_merkle_root(&mut self, merkle_root: RelayedMerkleRoot) { + if let Some(mr) = self.latest_merkle_root.as_ref() { + if mr.gear_block < merkle_root.gear_block { + self.latest_merkle_root = Some(merkle_root); + } + } else { + self.latest_merkle_root = Some(merkle_root); + } + } + + pub async fn process(&mut self, gear_api: &GearApi, eth_api: &EthApi) -> anyhow::Result<()> { + let Some(latest_merkle_root) = self.latest_merkle_root else { + return Ok(()); + }; + + let mut processed_blocks = vec![]; + + for (&message_block, messages) in self.messages.iter() { + if message_block > latest_merkle_root.gear_block { + break; + } + + let merkle_root_block_hash = gear_api + .block_number_to_hash(latest_merkle_root.gear_block) + .await?; + + for message in messages { + let tx_hash = submit_message( + gear_api, + eth_api, + message, + latest_merkle_root.gear_block, + merkle_root_block_hash, + ) + .await?; + + self.metrics.total_submitted_txs.inc(); + + self.pending_txs.push(RelayMessagePendingTx { + hash: tx_hash, + message_block, + message: message.clone(), + }); + } + + processed_blocks.push(message_block); + } + + for block in processed_blocks { + self.messages.remove_entry(&block); + } + + Ok(()) + } + + pub async fn try_finalize( + &mut self, + eth_api: &EthApi, + gear_api: &GearApi, + ) -> anyhow::Result { + for i in (0..self.pending_txs.len()).rev() { + if self.try_finalize_tx(i, eth_api, gear_api).await? { + self.pending_txs.remove(i); + } + } + + Ok(self.pending_txs.is_empty()) + } + + async fn try_finalize_tx( + &mut self, + tx: usize, + eth_api: &EthApi, + gear_api: &GearApi, + ) -> anyhow::Result { + let tx = &mut self.pending_txs[tx]; + let status = eth_api.get_tx_status(tx.hash).await?; + + let nonce = H256::from(tx.message.nonce_le); + + match status { + TxStatus::Finalized => { + log::info!( + "Message at block #{} with nonce {} finalized", + tx.message_block, + nonce + ); + Ok(true) + } + TxStatus::Pending => { + log::info!( + "Tx for message at block #{} with nonce {} is waiting for finalization", + tx.message_block, + nonce + ); + Ok(false) + } + TxStatus::Failed => { + self.metrics.total_failed_txs.inc(); + + let already_processed = eth_api.is_message_processed(tx.message.nonce_le).await?; + + if already_processed { + self.metrics.total_failed_txs_because_processed.inc(); + return Ok(true); + } + + let merkle_root_block = self + .latest_merkle_root + .ok_or(anyhow::anyhow!( + "Cannot finalize era without any merkle roots" + ))? + .gear_block; + + if merkle_root_block < tx.message_block { + anyhow::bail!( + "Cannot relay message at block #{}: latest merkle root is at block #{}", + tx.message_block, + merkle_root_block + ); + } + + let merkle_root_block_hash = + gear_api.block_number_to_hash(merkle_root_block).await?; + + let tx_hash = submit_message( + gear_api, + eth_api, + &tx.message, + merkle_root_block, + merkle_root_block_hash, + ) + .await?; + + self.metrics.total_submitted_txs.inc(); + + log::warn!( + "Retrying to send failed tx {} for message #{}. New tx: {}", + hex::encode(tx.hash.0), + nonce, + hex::encode(tx_hash.0) + ); + + tx.hash = tx_hash; + + Ok(false) + } + } + } +} + +async fn submit_message( + gear_api: &GearApi, + eth_api: &EthApi, + message: &Message, + merkle_root_block: u32, + merkle_root_block_hash: H256, +) -> anyhow::Result { + let message_hash = message_hash(message); + + log::info!("Relaying message with hash {}", hex::encode(message_hash)); + + let proof = gear_api + .fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into()) + .await?; + + let tx_hash = eth_api + .provide_content_message( + merkle_root_block, + proof.num_leaves as u32, + proof.leaf_index as u32, + message.nonce_le, + message.source, + message.destination, + message.payload.to_vec(), + proof.proof, + ) + .await?; + + log::info!("Message #{:?} relaying started", message.nonce_le); + + Ok(tx_hash) +} + +fn message_hash(message: &Message) -> [u8; 32] { + let data = [ + message.nonce_le.as_ref(), + message.source.as_ref(), + message.destination.as_ref(), + message.payload.as_ref(), + ] + .concat(); + + let mut hash = [0; 32]; + keccak_256(&data, &mut hash); + + hash +} diff --git a/relayer/src/message_relayer/common/era_tx_submitter.rs b/relayer/src/message_relayer/common/era_tx_submitter.rs deleted file mode 100644 index 6c6c7a14..00000000 --- a/relayer/src/message_relayer/common/era_tx_submitter.rs +++ /dev/null @@ -1,272 +0,0 @@ -use keccak_hash::keccak_256; -use std::{ - collections::{btree_map::Entry, BTreeMap, HashSet}, - sync::mpsc::Receiver, -}; - -use ethereum_client::{EthApi, TxHash, TxStatus}; -use gear_rpc_client::{dto::Message, GearApi}; -use primitive_types::H256; -use prometheus::{Gauge, IntCounter, IntGauge}; - -use utils_prometheus::{impl_metered_service, MeteredService}; - -use super::merkle_root_listener::RelayedMerkleRoot; - -type BlockNumber = u32; - -struct EraTxSubmitter { - latest_merkle_root: Option, - messages: BTreeMap>, - pending_txs: Vec, - - eth_api: EthApi, - gear_api: GearApi, -} - -struct PendingTx { - hash: TxHash, - message_block: u32, - message: Message, -} - -pub struct MessageInBlock { - pub message: Message, - pub block: BlockNumber, -} - -impl EraTxSubmitter { - pub fn new(eth_api: EthApi, gear_api: GearApi) -> Self { - Self { - latest_merkle_root: None, - messages: Default::default(), - pending_txs: vec![], - - eth_api, - gear_api, - } - } - - pub async fn run( - mut self, - messages: Receiver, - merkle_roots: Receiver, - ) { - loop { - let res = self.run_inner(&messages, &merkle_roots).await; - if let Err(err) = res { - log::error!("Ethereum tx submitter failed: {}", err); - } - } - } - - async fn run_inner( - &mut self, - messages: &Receiver, - merkle_roots: &Receiver, - ) -> anyhow::Result<()> { - loop { - for merkle_root in merkle_roots.try_iter() { - if let Some(mr) = self.latest_merkle_root.as_ref() { - if mr.gear_block < merkle_root.gear_block { - self.latest_merkle_root = Some(merkle_root); - } - } else { - self.latest_merkle_root = Some(merkle_root); - } - } - - for message in messages.try_iter() { - match self.messages.entry(message.block) { - Entry::Occupied(mut entry) => { - entry.get_mut().push(message.message); - } - Entry::Vacant(entry) => { - entry.insert(vec![message.message]); - } - } - } - - self.submit_queued_messages().await?; - - for i in (0..self.pending_txs.len()).rev() { - if self.try_finalize_tx(i).await? { - self.pending_txs.remove(i); - } - } - } - } - - async fn submit_queued_messages(&mut self) -> anyhow::Result<()> { - let Some(latest_merkle_root) = self.latest_merkle_root else { - return Ok(()); - }; - - let mut processed_blocks = vec![]; - - for (&message_block, messages) in self.messages.iter() { - if message_block > latest_merkle_root.gear_block { - break; - } - - let merkle_root_block_hash = self - .gear_api - .block_number_to_hash(latest_merkle_root.gear_block) - .await?; - - for message in messages { - let tx_hash = self - .submit_message( - message, - latest_merkle_root.gear_block, - merkle_root_block_hash, - ) - .await?; - - //self.metrics.total_submitted_txs.inc(); - - self.pending_txs.push(PendingTx { - hash: tx_hash, - message_block, - message: message.clone(), - }); - } - - processed_blocks.push(message_block); - } - - for block in processed_blocks { - self.messages.remove_entry(&block); - } - - Ok(()) - } - - async fn try_finalize_tx(&mut self, tx: usize) -> anyhow::Result { - let tx_message = self.pending_txs[tx].message.clone(); - let tx_message_block = self.pending_txs[tx].message_block; - let tx_hash = self.pending_txs[tx].hash; - - let status = self.eth_api.get_tx_status(tx_hash).await?; - - let nonce = H256::from(tx_message.nonce_le); - - match status { - TxStatus::Finalized => { - log::info!( - "Message at block #{} with nonce {} finalized", - tx_message_block, - nonce - ); - Ok(true) - } - TxStatus::Pending => { - log::info!( - "Tx for message at block #{} with nonce {} is waiting for finalization", - tx_message_block, - nonce - ); - Ok(false) - } - TxStatus::Failed => { - //self.metrics.total_failed_txs.inc(); - - let already_processed = self - .eth_api - .is_message_processed(tx_message.nonce_le) - .await?; - - if already_processed { - //self.metrics.total_failed_txs_because_processed.inc(); - return Ok(true); - } - - let merkle_root_block = self - .latest_merkle_root - .ok_or(anyhow::anyhow!( - "Cannot finalize era without any merkle roots" - ))? - .gear_block; - - if merkle_root_block < tx_message_block { - anyhow::bail!( - "Cannot relay message at block #{}: latest merkle root is at block #{}", - tx_message_block, - merkle_root_block - ); - } - - let merkle_root_block_hash = self - .gear_api - .block_number_to_hash(merkle_root_block) - .await?; - - let new_tx_hash = self - .submit_message(&tx_message, merkle_root_block, merkle_root_block_hash) - .await?; - - //self.metrics.total_submitted_txs.inc(); - - log::warn!( - "Retrying to send failed tx {} for message #{}. New tx: {}", - hex::encode(tx_hash.0), - nonce, - hex::encode(new_tx_hash.0) - ); - - self.pending_txs[tx].hash = new_tx_hash; - - Ok(false) - } - } - } - - async fn submit_message( - &self, - message: &Message, - merkle_root_block: u32, - merkle_root_block_hash: H256, - ) -> anyhow::Result { - let message_hash = message_hash(message); - - log::info!("Relaying message with hash {}", hex::encode(message_hash)); - - let proof = self - .gear_api - .fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into()) - .await?; - - let tx_hash = self - .eth_api - .provide_content_message( - merkle_root_block, - proof.num_leaves as u32, - proof.leaf_index as u32, - message.nonce_le, - message.source, - message.destination, - message.payload.to_vec(), - proof.proof, - ) - .await?; - - log::info!("Message #{:?} relaying started", message.nonce_le); - - Ok(tx_hash) - } -} - -fn message_hash(message: &Message) -> [u8; 32] { - let data = [ - message.nonce_le.as_ref(), - message.source.as_ref(), - message.destination.as_ref(), - message.payload.as_ref(), - ] - .concat(); - - let mut hash = [0; 32]; - keccak_256(&data, &mut hash); - - hash -} diff --git a/relayer/src/message_relayer/common/mod.rs b/relayer/src/message_relayer/common/mod.rs index aeb92abd..f789ecc8 100644 --- a/relayer/src/message_relayer/common/mod.rs +++ b/relayer/src/message_relayer/common/mod.rs @@ -1,2 +1,2 @@ -pub mod era_tx_submitter; +pub mod era; pub mod merkle_root_listener; diff --git a/relayer/src/message_relayer/message_processor.rs b/relayer/src/message_relayer/message_processor.rs index 962a3517..4e8ada70 100644 --- a/relayer/src/message_relayer/message_processor.rs +++ b/relayer/src/message_relayer/message_processor.rs @@ -1,18 +1,20 @@ -use keccak_hash::keccak_256; use std::{ collections::{btree_map::Entry, BTreeMap, HashSet}, sync::mpsc::Receiver, }; -use ethereum_client::{EthApi, TxHash, TxStatus}; -use gear_rpc_client::{dto::Message, GearApi}; -use primitive_types::H256; -use prometheus::{Gauge, IntCounter, IntGauge}; +use ethereum_client::EthApi; +use gear_rpc_client::GearApi; +use prometheus::{Gauge, IntGauge}; use utils_prometheus::{impl_metered_service, MeteredService}; use super::{ - common::merkle_root_listener::RelayedMerkleRoot, AuthoritySetId, BlockEvent, BlockNumber, + common::{ + era::{Era, EraMetrics}, + merkle_root_listener::RelayedMerkleRoot, + }, + AuthoritySetId, BlockEvent, }; pub struct MessageProcessor { @@ -23,51 +25,6 @@ pub struct MessageProcessor { era_metrics: EraMetrics, } -struct Era { - latest_merkle_root: Option, - messages: BTreeMap>, - pending_txs: Vec, - - metrics: EraMetrics, -} - -impl_metered_service! { - struct EraMetrics { - total_submitted_txs: IntCounter, - total_failed_txs: IntCounter, - total_failed_txs_because_processed: IntCounter, - } -} - -impl EraMetrics { - fn new() -> Self { - Self::new_inner().expect("Failed to create metrics") - } - - fn new_inner() -> prometheus::Result { - Ok(Self { - total_submitted_txs: IntCounter::new( - "message_relayer_message_processor_total_submitted_txs", - "Total amount of txs sent to ethereum", - )?, - total_failed_txs: IntCounter::new( - "message_relayer_message_processor_total_failed_txs", - "Total amount of txs sent to ethereum and failed", - )?, - total_failed_txs_because_processed: IntCounter::new( - "message_relayer_message_processor_total_failed_txs_because_processed", - "Amount of txs sent to ethereum and failed because they've already bee processed", - )?, - }) - } -} - -struct RelayMessagePendingTx { - hash: TxHash, - message_block: u32, - message: Message, -} - impl_metered_service! { struct Metrics { pending_tx_count: IntGauge, @@ -133,8 +90,6 @@ impl MessageProcessor { ) -> anyhow::Result<()> { let mut eras: BTreeMap = BTreeMap::new(); - let mut paid_messages = HashSet::new(); - loop { let fee_payer_balance = self.eth_api.get_approx_balance().await?; self.metrics.fee_payer_balance.set(fee_payer_balance); @@ -149,30 +104,18 @@ impl MessageProcessor { match eras.entry(authority_set_id) { Entry::Occupied(mut entry) => { - match entry.get_mut().messages.entry(message.block) { - Entry::Occupied(mut entry) => { - entry.get_mut().push(message.message); - } - Entry::Vacant(entry) => { - entry.insert(vec![message.message]); - } - } + entry.get_mut().push_message(message); } Entry::Vacant(entry) => { - let mut messages = BTreeMap::new(); - messages.insert(message.block, vec![message.message]); + let mut era = Era::new(self.era_metrics.clone()); + era.push_message(message); - entry.insert(Era { - latest_merkle_root: None, - messages, - pending_txs: vec![], - metrics: self.era_metrics.clone(), - }); + entry.insert(era); } } } BlockEvent::MessagePaid { nonce } => { - paid_messages.insert(nonce); + todo!(); } } } @@ -180,23 +123,13 @@ impl MessageProcessor { for new_merkle_root in merkle_roots.try_iter() { match eras.entry(new_merkle_root.authority_set_id) { Entry::Occupied(mut entry) => { - let era = entry.get_mut(); - - if let Some(mr) = era.latest_merkle_root.as_ref() { - if mr.gear_block < new_merkle_root.gear_block { - era.latest_merkle_root = Some(new_merkle_root); - } - } else { - era.latest_merkle_root = Some(new_merkle_root); - } + entry.get_mut().push_merkle_root(new_merkle_root); } Entry::Vacant(entry) => { - entry.insert(Era { - latest_merkle_root: Some(new_merkle_root), - messages: BTreeMap::new(), - pending_txs: vec![], - metrics: self.era_metrics.clone(), - }); + let mut era = Era::new(self.era_metrics.clone()); + era.push_merkle_root(new_merkle_root); + + entry.insert(era); } } } @@ -224,8 +157,8 @@ impl MessageProcessor { } } - let pending_tx_count: usize = eras.iter().map(|era| era.1.pending_txs.len()).sum(); - self.metrics.pending_tx_count.set(pending_tx_count as i64); + //let pending_tx_count: usize = eras.iter().map(|era| era.1.pending_txs.len()).sum(); + //self.metrics.pending_tx_count.set(pending_tx_count as i64); for finalized in finalized_eras { eras.remove(&finalized); @@ -233,193 +166,3 @@ impl MessageProcessor { } } } - -impl Era { - pub async fn process(&mut self, gear_api: &GearApi, eth_api: &EthApi) -> anyhow::Result<()> { - let Some(latest_merkle_root) = self.latest_merkle_root else { - return Ok(()); - }; - - let mut processed_blocks = vec![]; - - for (&message_block, messages) in self.messages.iter() { - if message_block > latest_merkle_root.gear_block { - break; - } - - let merkle_root_block_hash = gear_api - .block_number_to_hash(latest_merkle_root.gear_block) - .await?; - - for message in messages { - let tx_hash = submit_message( - gear_api, - eth_api, - message, - latest_merkle_root.gear_block, - merkle_root_block_hash, - ) - .await?; - - self.metrics.total_submitted_txs.inc(); - - self.pending_txs.push(RelayMessagePendingTx { - hash: tx_hash, - message_block, - message: message.clone(), - }); - } - - processed_blocks.push(message_block); - } - - for block in processed_blocks { - self.messages.remove_entry(&block); - } - - Ok(()) - } - - pub async fn try_finalize( - &mut self, - eth_api: &EthApi, - gear_api: &GearApi, - ) -> anyhow::Result { - for i in (0..self.pending_txs.len()).rev() { - if self.try_finalize_tx(i, eth_api, gear_api).await? { - self.pending_txs.remove(i); - } - } - - Ok(self.pending_txs.is_empty()) - } - - async fn try_finalize_tx( - &mut self, - tx: usize, - eth_api: &EthApi, - gear_api: &GearApi, - ) -> anyhow::Result { - let tx = &mut self.pending_txs[tx]; - let status = eth_api.get_tx_status(tx.hash).await?; - - let nonce = H256::from(tx.message.nonce_le); - - match status { - TxStatus::Finalized => { - log::info!( - "Message at block #{} with nonce {} finalized", - tx.message_block, - nonce - ); - Ok(true) - } - TxStatus::Pending => { - log::info!( - "Tx for message at block #{} with nonce {} is waiting for finalization", - tx.message_block, - nonce - ); - Ok(false) - } - TxStatus::Failed => { - self.metrics.total_failed_txs.inc(); - - let already_processed = eth_api.is_message_processed(tx.message.nonce_le).await?; - - if already_processed { - self.metrics.total_failed_txs_because_processed.inc(); - return Ok(true); - } - - let merkle_root_block = self - .latest_merkle_root - .ok_or(anyhow::anyhow!( - "Cannot finalize era without any merkle roots" - ))? - .gear_block; - - if merkle_root_block < tx.message_block { - anyhow::bail!( - "Cannot relay message at block #{}: latest merkle root is at block #{}", - tx.message_block, - merkle_root_block - ); - } - - let merkle_root_block_hash = - gear_api.block_number_to_hash(merkle_root_block).await?; - - let tx_hash = submit_message( - gear_api, - eth_api, - &tx.message, - merkle_root_block, - merkle_root_block_hash, - ) - .await?; - - self.metrics.total_submitted_txs.inc(); - - log::warn!( - "Retrying to send failed tx {} for message #{}. New tx: {}", - hex::encode(tx.hash.0), - nonce, - hex::encode(tx_hash.0) - ); - - tx.hash = tx_hash; - - Ok(false) - } - } - } -} - -async fn submit_message( - gear_api: &GearApi, - eth_api: &EthApi, - message: &Message, - merkle_root_block: u32, - merkle_root_block_hash: H256, -) -> anyhow::Result { - let message_hash = message_hash(message); - - log::info!("Relaying message with hash {}", hex::encode(message_hash)); - - let proof = gear_api - .fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into()) - .await?; - - let tx_hash = eth_api - .provide_content_message( - merkle_root_block, - proof.num_leaves as u32, - proof.leaf_index as u32, - message.nonce_le, - message.source, - message.destination, - message.payload.to_vec(), - proof.proof, - ) - .await?; - - log::info!("Message #{:?} relaying started", message.nonce_le); - - Ok(tx_hash) -} - -fn message_hash(message: &Message) -> [u8; 32] { - let data = [ - message.nonce_le.as_ref(), - message.source.as_ref(), - message.destination.as_ref(), - message.payload.as_ref(), - ] - .concat(); - - let mut hash = [0; 32]; - keccak_256(&data, &mut hash); - - hash -}