From dfd9e9fda73603eab06656d163c178cc84035433 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Mon, 10 Feb 2025 19:02:47 +0000 Subject: [PATCH] feat: graceful and alertable relayer reorg detection (#5401) ### Description - Only panics in the validator if there is a reorg detected - Mostly solves https://github.com/hyperlane-xyz/issues/issues/1394 (does not have super clear alert conditions for a reorg occurring, this should come later) - Whereas before we would use `Result>` as a return type for metadata building, where the Result indicates some error occurred (e.g. an RPC issue or something), Ok(Some(_)) meant that metadata building was successful, and Ok(None) meant that no error occurred but simply that fetching metadata wasn't successful -- we now have a new `Metadata` enum: ``` #[derive(Clone, Debug)] pub enum Metadata { /// Able to fetch metadata Ok(Vec), /// Unable to fetch metadata, but no error occurred CouldNotFetch, /// While building metadata, encountered something that should /// prohibit all metadata for the message from being built. /// Provides the reason for the refusal. Refused(String), } ``` - When we are building metadata and encounter a reorg flag in a validator that's used by the message, we handle this and return `Metadata::MetadataBuildingRefused`. The aggregation ISM will propagate this to make sure that we have end behavior of flatly refusing to build metadata for a message even if a nested metadata builder encounters this. If this happens, a new reprepare reason `MessageMetadataRefused` is introduced that's used. - e2e test is added to confirm this behavior ### Drive-by changes ### Related issues ### Backward compatibility ### Testing e2e, some unit --------- Co-authored-by: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> --- .../relayer/src/msg/metadata/aggregation.rs | 28 ++++-- .../agents/relayer/src/msg/metadata/base.rs | 35 +++++-- .../relayer/src/msg/metadata/ccip_read.rs | 18 ++-- .../agents/relayer/src/msg/metadata/mod.rs | 4 +- .../relayer/src/msg/metadata/multisig/base.rs | 30 +++--- .../relayer/src/msg/metadata/null_metadata.rs | 6 +- .../relayer/src/msg/metadata/routing.rs | 8 +- rust/main/agents/relayer/src/msg/op_queue.rs | 6 +- .../agents/relayer/src/msg/pending_message.rs | 22 +++-- .../relayer/src/server/message_retry.rs | 2 +- rust/main/agents/validator/src/validator.rs | 5 +- .../src/settings/checkpoint_syncer.rs | 91 +++++++++---------- .../src/traits/pending_operation.rs | 3 + rust/main/hyperlane-core/src/types/reorg.rs | 2 +- rust/main/utils/run-locally/src/main.rs | 70 +++++++++++++- .../infra/scripts/send-test-messages.ts | 31 ++++++- 16 files changed, 242 insertions(+), 119 deletions(-) diff --git a/rust/main/agents/relayer/src/msg/metadata/aggregation.rs b/rust/main/agents/relayer/src/msg/metadata/aggregation.rs index 1d4ecbc732..448796162c 100644 --- a/rust/main/agents/relayer/src/msg/metadata/aggregation.rs +++ b/rust/main/agents/relayer/src/msg/metadata/aggregation.rs @@ -9,7 +9,7 @@ use tracing::{info, instrument}; use hyperlane_core::{HyperlaneMessage, InterchainSecurityModule, ModuleType, H256, U256}; -use super::{MessageMetadataBuilder, MetadataBuilder}; +use super::{MessageMetadataBuilder, Metadata, MetadataBuilder}; /// Bytes used to store one member of the (start, end) range tuple /// Copied from `AggregationIsmMetadata.sol` @@ -119,11 +119,7 @@ impl AggregationIsmMetadataBuilder { impl MetadataBuilder for AggregationIsmMetadataBuilder { #[instrument(err, skip(self, message), ret)] #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue - async fn build( - &self, - ism_address: H256, - message: &HyperlaneMessage, - ) -> eyre::Result>> { + async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> eyre::Result { const CTX: &str = "When fetching AggregationIsm metadata"; let ism = self.build_aggregation_ism(ism_address).await.context(CTX)?; let (ism_addresses, threshold) = ism.modules_and_threshold(message).await.context(CTX)?; @@ -136,6 +132,18 @@ impl MetadataBuilder for AggregationIsmMetadataBuilder { ) .await; + // If any inner ISMs are refusing to build metadata, we propagate just the first refusal. + if let Some(first_refusal) = sub_modules_and_metas.iter().find_map(|result| { + result.as_ref().ok().and_then(|sub_module_and_meta| { + match &sub_module_and_meta.metadata { + Metadata::Refused(reason) => Some(Metadata::Refused(reason.clone())), + _ => None, + } + }) + }) { + return Ok(first_refusal); + } + // Partitions things into // 1. ok_sub_modules: ISMs with metadata with valid metadata // 2. err_sub_modules: ISMs with invalid metadata @@ -145,19 +153,21 @@ impl MetadataBuilder for AggregationIsmMetadataBuilder { .enumerate() .partition_map(|(index, (result, ism_address))| match result { Ok(sub_module_and_meta) => match sub_module_and_meta.metadata { - Some(metadata) => Either::Left(IsmAndMetadata::new( + Metadata::Found(metadata) => Either::Left(IsmAndMetadata::new( sub_module_and_meta.ism, index, metadata, )), - None => Either::Right((*ism_address, Some(sub_module_and_meta.module_type))), + _ => Either::Right((*ism_address, Some(sub_module_and_meta.module_type))), }, Err(_) => Either::Right((*ism_address, None)), }); let maybe_aggregation_metadata = Self::cheapest_valid_metas(ok_sub_modules, message, threshold, err_sub_modules) .await - .map(|mut metas| Self::format_metadata(&mut metas, ism_addresses.len())); + .map_or(Metadata::CouldNotFetch, |mut metas| { + Metadata::Found(Self::format_metadata(&mut metas, ism_addresses.len())) + }); Ok(maybe_aggregation_metadata) } } diff --git a/rust/main/agents/relayer/src/msg/metadata/base.rs b/rust/main/agents/relayer/src/msg/metadata/base.rs index 4bc55457f9..3f48d09bec 100644 --- a/rust/main/agents/relayer/src/msg/metadata/base.rs +++ b/rust/main/agents/relayer/src/msg/metadata/base.rs @@ -22,7 +22,10 @@ use crate::{ use async_trait::async_trait; use derive_new::new; use eyre::{Context, Result}; -use hyperlane_base::db::{HyperlaneDb, HyperlaneRocksDB}; +use hyperlane_base::{ + db::{HyperlaneDb, HyperlaneRocksDB}, + settings::CheckpointSyncerBuildError, +}; use hyperlane_base::{ settings::{ChainConf, CheckpointSyncerConf}, CheckpointSyncer, CoreMetrics, MultisigCheckpointSyncer, @@ -44,17 +47,28 @@ pub enum MetadataBuilderError { MaxDepthExceeded(u32), } +#[derive(Clone, Debug)] +pub enum Metadata { + /// Able to fetch metadata + Found(Vec), + /// Unable to fetch metadata, but no error occurred + CouldNotFetch, + /// While building metadata, encountered something that should + /// prohibit all metadata for the message from being built. + /// Provides the reason for the refusal. + Refused(String), +} + #[derive(Debug)] pub struct IsmWithMetadataAndType { pub ism: Box, - pub metadata: Option>, + pub metadata: Metadata, pub module_type: ModuleType, } #[async_trait] pub trait MetadataBuilder: Send + Sync { - async fn build(&self, ism_address: H256, message: &HyperlaneMessage) - -> Result>>; + async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> Result; } /// Allows fetching the default ISM, caching the value for a period of time @@ -190,11 +204,7 @@ impl Deref for MessageMetadataBuilder { #[async_trait] impl MetadataBuilder for MessageMetadataBuilder { #[instrument(err, skip(self, message), fields(destination_domain=self.destination_domain().name()))] - async fn build( - &self, - ism_address: H256, - message: &HyperlaneMessage, - ) -> Result>> { + async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> Result { self.build_ism_and_metadata(ism_address, message) .await .map(|ism_with_metadata| ism_with_metadata.metadata) @@ -370,7 +380,7 @@ impl BaseMetadataBuilder { message: &HyperlaneMessage, validators: &[H256], app_context: Option, - ) -> Result { + ) -> Result { let storage_locations = self .origin_validator_announce .get_announced_storage_locations(validators) @@ -416,6 +426,11 @@ impl BaseMetadataBuilder { checkpoint_syncers.insert(validator.into(), checkpoint_syncer.into()); break; } + Err(CheckpointSyncerBuildError::ReorgEvent(reorg_event)) => { + // If a reorg event has been posted to a checkpoint syncer, + // we refuse to build + return Err(CheckpointSyncerBuildError::ReorgEvent(reorg_event)); + } Err(err) => { debug!( error=%err, diff --git a/rust/main/agents/relayer/src/msg/metadata/ccip_read.rs b/rust/main/agents/relayer/src/msg/metadata/ccip_read.rs index 49b15837df..10282da474 100644 --- a/rust/main/agents/relayer/src/msg/metadata/ccip_read.rs +++ b/rust/main/agents/relayer/src/msg/metadata/ccip_read.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::{info, instrument}; -use super::{base::MessageMetadataBuilder, MetadataBuilder}; +use super::{base::MessageMetadataBuilder, Metadata, MetadataBuilder}; #[derive(Serialize, Deserialize)] struct OffchainResponse { @@ -28,11 +28,7 @@ pub struct CcipReadIsmMetadataBuilder { #[async_trait] impl MetadataBuilder for CcipReadIsmMetadataBuilder { #[instrument(err, skip(self, message))] - async fn build( - &self, - ism_address: H256, - message: &HyperlaneMessage, - ) -> eyre::Result>> { + async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> eyre::Result { const CTX: &str = "When fetching CcipRead metadata"; let ism = self.build_ccip_read_ism(ism_address).await.context(CTX)?; @@ -42,15 +38,15 @@ impl MetadataBuilder for CcipReadIsmMetadataBuilder { let info: OffchainLookup = match response { Ok(_) => { info!("incorrectly configured getOffchainVerifyInfo, expected revert"); - return Ok(None); + return Ok(Metadata::CouldNotFetch); } Err(raw_error) => { let matching_regex = Regex::new(r"0x[[:xdigit:]]+")?; if let Some(matching) = &matching_regex.captures(&raw_error.to_string()) { OffchainLookup::decode(hex_decode(&matching[0][2..])?)? } else { - info!("unable to parse custom error out of revert"); - return Ok(None); + info!(?raw_error, "unable to parse custom error out of revert"); + return Ok(Metadata::CouldNotFetch); } } }; @@ -85,7 +81,7 @@ impl MetadataBuilder for CcipReadIsmMetadataBuilder { Ok(result) => { // remove leading 0x which hex_decode doesn't like let metadata = hex_decode(&result.data[2..])?; - return Ok(Some(metadata)); + return Ok(Metadata::Found(metadata)); } Err(_err) => { // try the next URL @@ -94,6 +90,6 @@ impl MetadataBuilder for CcipReadIsmMetadataBuilder { } // No metadata endpoints or endpoints down - Ok(None) + Ok(Metadata::CouldNotFetch) } } diff --git a/rust/main/agents/relayer/src/msg/metadata/mod.rs b/rust/main/agents/relayer/src/msg/metadata/mod.rs index 7ab1d892a6..1b727f0703 100644 --- a/rust/main/agents/relayer/src/msg/metadata/mod.rs +++ b/rust/main/agents/relayer/src/msg/metadata/mod.rs @@ -6,9 +6,9 @@ mod null_metadata; mod routing; use aggregation::AggregationIsmMetadataBuilder; -pub(crate) use base::MetadataBuilder; pub(crate) use base::{ - AppContextClassifier, BaseMetadataBuilder, IsmAwareAppContextClassifier, MessageMetadataBuilder, + AppContextClassifier, BaseMetadataBuilder, IsmAwareAppContextClassifier, + MessageMetadataBuilder, Metadata, MetadataBuilder, }; use ccip_read::CcipReadIsmMetadataBuilder; use null_metadata::NullMetadataBuilder; diff --git a/rust/main/agents/relayer/src/msg/metadata/multisig/base.rs b/rust/main/agents/relayer/src/msg/metadata/multisig/base.rs index 233f7b7f77..bdb2a20f12 100644 --- a/rust/main/agents/relayer/src/msg/metadata/multisig/base.rs +++ b/rust/main/agents/relayer/src/msg/metadata/multisig/base.rs @@ -6,6 +6,7 @@ use derive_new::new; use ethers::abi::Token; use eyre::{Context, Result}; +use hyperlane_base::settings::CheckpointSyncerBuildError; use hyperlane_base::MultisigCheckpointSyncer; use hyperlane_core::accumulator::merkle::Proof; use hyperlane_core::{HyperlaneMessage, MultisigSignedCheckpoint, H256}; @@ -14,7 +15,7 @@ use tracing::{debug, info}; use crate::msg::metadata::base::MessageMetadataBuilder; -use crate::msg::metadata::MetadataBuilder; +use crate::msg::metadata::{Metadata, MetadataBuilder}; #[derive(new, AsRef, Deref)] pub struct MultisigMetadata { @@ -93,11 +94,7 @@ pub trait MultisigIsmMetadataBuilder: AsRef + Send + Syn #[async_trait] impl MetadataBuilder for T { - async fn build( - &self, - ism_address: H256, - message: &HyperlaneMessage, - ) -> Result>> { + async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> Result { const CTX: &str = "When fetching MultisigIsm metadata"; let multisig_ism = self .as_ref() @@ -112,16 +109,27 @@ impl MetadataBuilder for T { if validators.is_empty() { info!("Could not fetch metadata: No validator set found for ISM"); - return Ok(None); + return Ok(Metadata::CouldNotFetch); } info!(hyp_message=?message, ?validators, threshold, "List of validators and threshold for message"); - let checkpoint_syncer = self + let checkpoint_syncer = match self .as_ref() .build_checkpoint_syncer(message, &validators, self.as_ref().app_context.clone()) .await - .context(CTX)?; + { + Ok(syncer) => syncer, + Err(CheckpointSyncerBuildError::ReorgEvent(reorg_event)) => { + return Ok(Metadata::Refused(format!( + "A reorg event occurred {:?}", + reorg_event + ))); + } + Err(e) => { + return Err(e).context(CTX); + } + }; if let Some(metadata) = self .fetch_metadata(&validators, threshold, message, &checkpoint_syncer) @@ -129,13 +137,13 @@ impl MetadataBuilder for T { .context(CTX)? { debug!(hyp_message=?message, ?metadata.checkpoint, "Found checkpoint with quorum"); - Ok(Some(self.format_metadata(metadata)?)) + Ok(Metadata::Found(self.format_metadata(metadata)?)) } else { info!( hyp_message=?message, ?validators, threshold, ism=%multisig_ism.address(), "Could not fetch metadata: Unable to reach quorum" ); - Ok(None) + Ok(Metadata::CouldNotFetch) } } } diff --git a/rust/main/agents/relayer/src/msg/metadata/null_metadata.rs b/rust/main/agents/relayer/src/msg/metadata/null_metadata.rs index 3793fe6b12..e271ff608a 100644 --- a/rust/main/agents/relayer/src/msg/metadata/null_metadata.rs +++ b/rust/main/agents/relayer/src/msg/metadata/null_metadata.rs @@ -1,4 +1,4 @@ -use super::MetadataBuilder; +use super::{Metadata, MetadataBuilder}; use async_trait::async_trait; use derive_new::new; use tracing::instrument; @@ -16,7 +16,7 @@ impl MetadataBuilder for NullMetadataBuilder { &self, _ism_address: H256, _message: &HyperlaneMessage, - ) -> eyre::Result>> { - Ok(Some(vec![])) + ) -> eyre::Result { + Ok(Metadata::Found(vec![])) } } diff --git a/rust/main/agents/relayer/src/msg/metadata/routing.rs b/rust/main/agents/relayer/src/msg/metadata/routing.rs index f8d5a145bc..f65389c67d 100644 --- a/rust/main/agents/relayer/src/msg/metadata/routing.rs +++ b/rust/main/agents/relayer/src/msg/metadata/routing.rs @@ -5,7 +5,7 @@ use eyre::Context; use hyperlane_core::{HyperlaneMessage, H256}; use tracing::instrument; -use super::{MessageMetadataBuilder, MetadataBuilder}; +use super::{MessageMetadataBuilder, Metadata, MetadataBuilder}; #[derive(Clone, Debug, new, Deref)] pub struct RoutingIsmMetadataBuilder { @@ -16,11 +16,7 @@ pub struct RoutingIsmMetadataBuilder { impl MetadataBuilder for RoutingIsmMetadataBuilder { #[instrument(err, skip(self, message), ret)] #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue - async fn build( - &self, - ism_address: H256, - message: &HyperlaneMessage, - ) -> eyre::Result>> { + async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> eyre::Result { const CTX: &str = "When fetching RoutingIsm metadata"; let ism = self.build_routing_ism(ism_address).await.context(CTX)?; let module = ism.route(message).await.context(CTX)?; diff --git a/rust/main/agents/relayer/src/msg/op_queue.rs b/rust/main/agents/relayer/src/msg/op_queue.rs index 7fbc221cfd..8e2322ed9a 100644 --- a/rust/main/agents/relayer/src/msg/op_queue.rs +++ b/rust/main/agents/relayer/src/msg/op_queue.rs @@ -376,12 +376,12 @@ pub mod test { fn initialize_queue(broadcaster: &sync::broadcast::Sender) -> OpQueue { let (metrics, queue_metrics_label) = dummy_metrics_and_label(); - let op_queue = OpQueue::new( + + OpQueue::new( metrics.clone(), queue_metrics_label.clone(), Arc::new(Mutex::new(broadcaster.subscribe())), - ); - op_queue + ) } fn generate_test_messages( diff --git a/rust/main/agents/relayer/src/msg/pending_message.rs b/rust/main/agents/relayer/src/msg/pending_message.rs index dd502e4d03..134cf28760 100644 --- a/rust/main/agents/relayer/src/msg/pending_message.rs +++ b/rust/main/agents/relayer/src/msg/pending_message.rs @@ -25,7 +25,7 @@ use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument use super::{ gas_payment::{GasPaymentEnforcer, GasPolicyStatus}, - metadata::{BaseMetadataBuilder, MessageMetadataBuilder, MetadataBuilder}, + metadata::{BaseMetadataBuilder, MessageMetadataBuilder, Metadata, MetadataBuilder}, }; pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) { @@ -273,10 +273,20 @@ impl PendingOperation for PendingMessage { return self.on_reprepare(Some(err), ReprepareReason::ErrorBuildingMetadata); } }; - self.metadata = metadata.clone(); - let Some(metadata) = metadata else { - return self.on_reprepare::(None, ReprepareReason::CouldNotFetchMetadata); + let metadata_bytes = match metadata { + Metadata::Found(metadata_bytes) => { + self.metadata = Some(metadata_bytes.clone()); + metadata_bytes + } + Metadata::CouldNotFetch => { + return self.on_reprepare::(None, ReprepareReason::CouldNotFetchMetadata); + } + // If the metadata building is refused, we still allow it to be retried later. + Metadata::Refused(reason) => { + warn!(?reason, "Metadata building refused"); + return self.on_reprepare::(None, ReprepareReason::MessageMetadataRefused); + } }; // Estimate transaction costs for the process call. If there are issues, it's @@ -286,7 +296,7 @@ impl PendingOperation for PendingMessage { let tx_cost_estimate = match self .ctx .destination_mailbox - .process_estimate_costs(&self.message, &metadata) + .process_estimate_costs(&self.message, &metadata_bytes) .await { Ok(tx_cost_estimate) => tx_cost_estimate, @@ -334,7 +344,7 @@ impl PendingOperation for PendingMessage { } self.submission_data = Some(Box::new(MessageSubmissionData { - metadata, + metadata: metadata_bytes, gas_limit, })); PendingOperationResult::Success diff --git a/rust/main/agents/relayer/src/server/message_retry.rs b/rust/main/agents/relayer/src/server/message_retry.rs index 9207bc3523..a2fa56bc32 100644 --- a/rust/main/agents/relayer/src/server/message_retry.rs +++ b/rust/main/agents/relayer/src/server/message_retry.rs @@ -146,7 +146,7 @@ mod tests { if let Ok(req) = retry_request_receiver.recv().await { for (op, (evaluated, matched)) in pending_operations.iter().zip(metrics) { // Check that the list received by the server matches the pending operation - assert!(req.pattern.op_matches(&op)); + assert!(req.pattern.op_matches(op)); let resp = MessageRetryQueueResponse { evaluated, matched }; req.transmitter.send(resp).await.unwrap(); } diff --git a/rust/main/agents/validator/src/validator.rs b/rust/main/agents/validator/src/validator.rs index f599fa8e39..76dfbbd21f 100644 --- a/rust/main/agents/validator/src/validator.rs +++ b/rust/main/agents/validator/src/validator.rs @@ -77,10 +77,13 @@ impl BaseAgent for Validator { let (signer_instance, signer) = SingletonSigner::new(settings.validator.build().await?); let core = settings.build_hyperlane_core(metrics.clone()); + // Be extra sure to panic checkpoint syncer fails, which indicates + // a fatal startup error. let checkpoint_syncer = settings .checkpoint_syncer .build_and_validate(None) - .await? + .await + .expect("Failed to build checkpoint syncer") .into(); let mailbox = settings diff --git a/rust/main/hyperlane-base/src/settings/checkpoint_syncer.rs b/rust/main/hyperlane-base/src/settings/checkpoint_syncer.rs index 3434a7168a..b1335f12e9 100644 --- a/rust/main/hyperlane-base/src/settings/checkpoint_syncer.rs +++ b/rust/main/hyperlane-base/src/settings/checkpoint_syncer.rs @@ -4,6 +4,7 @@ use crate::{ }; use core::str::FromStr; use eyre::{eyre, Context, Report, Result}; +use hyperlane_core::{ChainCommunicationError, ReorgEvent}; use prometheus::IntGauge; use rusoto_core::Region; use std::{env, path::PathBuf}; @@ -41,6 +42,20 @@ pub enum CheckpointSyncerConf { }, } +/// Checkpoint Syncer errors +#[derive(Debug, thiserror::Error)] +pub enum CheckpointSyncerBuildError { + /// A reorg event has been detected in the checkpoint syncer when building it + #[error("A reorg event has been detected: {0:?}")] + ReorgEvent(ReorgEvent), + /// Error communicating with the chain + #[error(transparent)] + ChainError(#[from] ChainCommunicationError), + /// Other errors + #[error(transparent)] + Other(#[from] Report), +} + impl FromStr for CheckpointSyncerConf { type Err = Report; @@ -110,15 +125,12 @@ impl CheckpointSyncerConf { pub async fn build_and_validate( &self, latest_index_gauge: Option, - ) -> Result, Report> { + ) -> Result, CheckpointSyncerBuildError> { let syncer: Box = self.build(latest_index_gauge).await?; match syncer.reorg_status().await { Ok(Some(reorg_event)) => { - panic!( - "A reorg event has been detected: {:#?}. Please resolve the reorg to continue.", - reorg_event - ); + return Err(CheckpointSyncerBuildError::ReorgEvent(reorg_event)); } Err(err) => { error!( @@ -177,10 +189,7 @@ impl CheckpointSyncerConf { #[cfg(test)] mod test { - use std::panic::AssertUnwindSafe; - - use futures_util::FutureExt; - use hyperlane_core::{ReorgEvent, ReorgPeriod, H256}; + use hyperlane_core::{ReorgPeriod, H256}; #[tokio::test] async fn test_build_and_validate() { @@ -191,6 +200,23 @@ mod test { let checkpoint_path = format!("file://{}", temp_checkpoint_dir.path().to_str().unwrap()); let checkpoint_syncer_conf = CheckpointSyncerConf::from_str(&checkpoint_path).unwrap(); + let dummy_local_merkle_root = + H256::from_str("0x8da44bc8198e9874db215ec2000037c58e16918c94743d70c838ecb10e243c64") + .unwrap(); + let dummy_canonical_merkle_root = + H256::from_str("0xb437b888332ef12f7260c7f679aad3c96b91ab81c2dc7242f8b290f0b6bba92b") + .unwrap(); + let dummy_checkpoint_index = 56; + let unix_timestamp = 1620000000; + let reorg_period = ReorgPeriod::from_blocks(5); + let dummy_reorg_event = ReorgEvent { + local_merkle_root: dummy_local_merkle_root, + canonical_merkle_root: dummy_canonical_merkle_root, + checkpoint_index: dummy_checkpoint_index, + unix_timestamp, + reorg_period, + }; + // create a checkpoint syncer and write a reorg event // then `drop` it, to simulate a restart { @@ -199,24 +225,6 @@ mod test { .await .unwrap(); - let dummy_local_merkle_root = H256::from_str( - "0x8da44bc8198e9874db215ec2000037c58e16918c94743d70c838ecb10e243c64", - ) - .unwrap(); - let dummy_canonical_merkle_root = H256::from_str( - "0xb437b888332ef12f7260c7f679aad3c96b91ab81c2dc7242f8b290f0b6bba92b", - ) - .unwrap(); - let dummy_checkpoint_index = 56; - let unix_timestamp = 1620000000; - let reorg_period = ReorgPeriod::from_blocks(5); - let dummy_reorg_event = ReorgEvent { - local_merkle_root: dummy_local_merkle_root, - canonical_merkle_root: dummy_canonical_merkle_root, - checkpoint_index: dummy_checkpoint_index, - unix_timestamp, - reorg_period, - }; checkpoint_syncer .write_reorg_status(&dummy_reorg_event) .await @@ -224,29 +232,12 @@ mod test { } // Initialize a new checkpoint syncer and expect it to panic due to the reorg event. - // `AssertUnwindSafe` is required for ignoring some type checks so the panic can be caught. - let startup_result = AssertUnwindSafe(checkpoint_syncer_conf.build_and_validate(None)) - .catch_unwind() - .await - .unwrap_err(); - if let Some(err) = startup_result.downcast_ref::() { - assert_eq!( - *err, - r#"A reorg event has been detected: ReorgEvent { - local_merkle_root: 0x8da44bc8198e9874db215ec2000037c58e16918c94743d70c838ecb10e243c64, - canonical_merkle_root: 0xb437b888332ef12f7260c7f679aad3c96b91ab81c2dc7242f8b290f0b6bba92b, - checkpoint_index: 56, - unix_timestamp: 1620000000, - reorg_period: Blocks( - 5, - ), -}. Please resolve the reorg to continue."# - ); - } else { - panic!( - "Caught panic has a different type than the expected one (`String`): {:?}", - startup_result - ); + let result = checkpoint_syncer_conf.build_and_validate(None).await; + match result { + Err(CheckpointSyncerBuildError::ReorgEvent(e)) => { + assert_eq!(e, dummy_reorg_event, "Reported reorg event doesn't match"); + } + _ => panic!("Expected a reorg event error"), } } } diff --git a/rust/main/hyperlane-core/src/traits/pending_operation.rs b/rust/main/hyperlane-core/src/traits/pending_operation.rs index e0a6622c3a..a5c51f60f7 100644 --- a/rust/main/hyperlane-core/src/traits/pending_operation.rs +++ b/rust/main/hyperlane-core/src/traits/pending_operation.rs @@ -250,6 +250,9 @@ pub enum ReprepareReason { #[strum(to_string = "Delivery transaction reverted or reorged")] /// Delivery transaction reverted or reorged RevertedOrReorged, + /// The metadata building was refused for the message. + #[strum(to_string = "Message metadata refused")] + MessageMetadataRefused, } #[derive(Display, Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/rust/main/hyperlane-core/src/types/reorg.rs b/rust/main/hyperlane-core/src/types/reorg.rs index eb41844c0e..74faf8626a 100644 --- a/rust/main/hyperlane-core/src/types/reorg.rs +++ b/rust/main/hyperlane-core/src/types/reorg.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::{ReorgPeriod, H256}; /// Details about a detected chain reorg, from an agent's perspective -#[derive(Debug, Clone, Serialize, Deserialize, new)] +#[derive(Debug, Clone, Serialize, Deserialize, new, PartialEq, Default)] pub struct ReorgEvent { /// the merkle root built from this agent's indexed events pub local_merkle_root: H256, diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index e19fa1564d..8f5f74dad3 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -18,6 +18,7 @@ use std::{ collections::HashMap, fs::{self, File}, + io::Write, path::Path, process::{Child, ExitCode}, sync::{ @@ -29,6 +30,7 @@ use std::{ }; use ethers_contract::MULTICALL_ADDRESS; +use hyperlane_core::{PendingOperationStatus, ReorgEvent, ReprepareReason}; use logging::log; pub use metrics::fetch_metric; use once_cell::sync::Lazy; @@ -460,6 +462,23 @@ fn main() -> ExitCode { return report_test_result(test_passed); } + // Simulate a reorg, which we'll later use + // to ensure the relayer handles reorgs correctly. + // Kill validator 1 to make sure it doesn't crash by detecting it posted a reorg, + // causing e2e to also fail. + stop_validator(&mut state, 1); + set_validator_reorg_flag(&checkpoints_dirs, 1); + + // Send a single message from validator 1's origin chain to test the relayer's reorg handling. + Program::new("yarn") + .working_dir(INFRA_PATH) + .cmd("kathy") + .arg("messages", "1") + .arg("timeout", "1000") + .arg("single-origin", "test1") + .run() + .join(); + // Here we want to restart the relayer and validate // its restart behaviour. restart_relayer(&config, &mut state, &rocks_db_dir); @@ -472,7 +491,7 @@ fn main() -> ExitCode { test_passed = wait_for_condition( &config, loop_start, - relayer_restart_invariants_met, + || Ok(relayer_restart_invariants_met()? && relayer_reorg_handling_invariants_met()?), || !SHUTDOWN.load(Ordering::Relaxed), || long_running_processes_exited_check(&mut state), ); @@ -559,6 +578,34 @@ fn create_relayer(config: &Config, rocks_db_dir: &TempDir) -> Program { } } +/// Kills relayer in State and respawns the relayer again +fn stop_validator(state: &mut State, validator_index: usize) { + let name = format!("VL{}", validator_index + 1); + log!("Stopping validator {}...", name); + let (child, _) = state + .agents + .get_mut(&name) + .unwrap_or_else(|| panic!("Validator {} not found", name)); + child + .kill() + .unwrap_or_else(|_| panic!("Failed to stop validator {}", name)); + // Remove the validator from the state + state.agents.remove(&name); +} + +fn set_validator_reorg_flag(checkpoints_dirs: &[DynPath], validator_index: usize) { + let reorg_event = ReorgEvent::default(); + + let checkpoint_path = (*checkpoints_dirs[validator_index]).as_ref(); + let reorg_flag_path = checkpoint_path.join("reorg_flag.json"); + let mut reorg_flag_file = + File::create(reorg_flag_path).expect("Failed to create reorg flag file"); + // Write to file + let _ = reorg_flag_file + .write(serde_json::to_string(&reorg_event).unwrap().as_bytes()) + .expect("Failed to write to reorg flag file"); +} + /// Kills relayer in State and respawns the relayer again fn restart_relayer(config: &Config, state: &mut State, rocks_db_dir: &TempDir) { log!("Stopping relayer..."); @@ -571,6 +618,25 @@ fn restart_relayer(config: &Config, state: &mut State, rocks_db_dir: &TempDir) { log!("Restarted relayer..."); } +fn relayer_reorg_handling_invariants_met() -> eyre::Result { + let refused_messages = fetch_metric( + RELAYER_METRICS_PORT, + "hyperlane_submitter_queue_length", + &HashMap::from([( + "operation_status", + PendingOperationStatus::Retry(ReprepareReason::MessageMetadataRefused) + .to_string() + .as_str(), + )]), + )?; + if refused_messages.iter().sum::() == 0 { + log!("Relayer still doesn't have any MessageMetadataRefused messages in the queue."); + return Ok(false); + }; + + Ok(true) +} + /// Check relayer restart behaviour is correct. /// So far, we only check if undelivered messages' statuses /// are correctly retrieved from the database @@ -585,7 +651,7 @@ fn relayer_restart_invariants_met() -> eyre::Result { let no_metadata_message_count = *matched_logs .get(&line_filters) - .expect("Failed to get matched message count"); + .ok_or_else(|| eyre::eyre!("No logs matched line filters"))?; // These messages are never inserted into the merkle tree. // So these messages will never be deliverable and will always // be in a CouldNotFetchMetadata state. diff --git a/typescript/infra/scripts/send-test-messages.ts b/typescript/infra/scripts/send-test-messages.ts index 6775ac5534..41d9501e3e 100644 --- a/typescript/infra/scripts/send-test-messages.ts +++ b/typescript/infra/scripts/send-test-messages.ts @@ -127,6 +127,24 @@ function getArgs() { default: false, describe: 'Mine forever after sending messages', }) + .option('singleOrigin', { + type: 'string', + default: undefined, + describe: 'If specified, only sends from a single origin chain', + coerce: (arg) => { + if (arg) { + if (!Object.values(TestChainName).includes(arg)) { + throw new Error( + `Invalid chain name ${arg}. Must be one of ${Object.values( + TestChainName, + )}`, + ); + } + return arg as TestChainName; + } + return undefined; + }, + }) .option(MailboxHookType.DEFAULT, { type: 'string', describe: 'Description for defaultHook', @@ -143,7 +161,8 @@ function getArgs() { async function main() { const args = await getArgs(); - const { timeout, defaultHook, requiredHook, mineforever } = args; + const { timeout, defaultHook, requiredHook, mineforever, singleOrigin } = + args; let messages = args.messages; // Limit the test chains to a subset of the known chains @@ -179,8 +198,14 @@ async function main() { // Generate artificial traffic const run_forever = messages === 0; while (run_forever || messages-- > 0) { - // Round robin origin chain - const local = kathyTestChains[messages % kathyTestChains.length]; + // By default, round robin origin chain + let local: TestChainName; + if (singleOrigin) { + local = singleOrigin; + } else { + local = kathyTestChains[messages % kathyTestChains.length]; + } + // Random remote chain const remote: ChainName = randomElement( kathyTestChains.filter((c) => c !== local),