Skip to content

Commit

Permalink
Merge branch 'main' into kunal/fix-fallback-ism-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
aroralanuk authored Dec 5, 2023
2 parents 128aef1 + 8b16ade commit 8b05186
Show file tree
Hide file tree
Showing 89 changed files with 1,134 additions and 619 deletions.
5 changes: 0 additions & 5 deletions .changeset/little-beans-attack.md

This file was deleted.

7 changes: 0 additions & 7 deletions .changeset/odd-keys-pretend.md

This file was deleted.

6 changes: 0 additions & 6 deletions .changeset/smooth-swans-wave.md

This file was deleted.

7 changes: 0 additions & 7 deletions .changeset/tall-shirts-accept.md

This file was deleted.

3 changes: 3 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/agents/relayer/src/merkle_tree/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_core::{HyperlaneDomain, MerkleTreeInsertion};
use prometheus::IntGauge;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::trace;

use crate::processor::ProcessorExt;

Expand Down Expand Up @@ -76,7 +76,7 @@ impl MerkleTreeProcessor {
.set(insertion.index() as i64);
Some(insertion)
} else {
debug!(leaf_index=?self.leaf_index, "No message found in DB for leaf index");
trace!(leaf_index=?self.leaf_index, "No merkle tree insertion found in DB for leaf index, waiting for it to be indexed");
None
};
Ok(leaf)
Expand Down
73 changes: 53 additions & 20 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
metrics::{AgentMetrics, AgentMetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -49,7 +53,7 @@ struct ContextKey {
#[derive(AsRef)]
pub struct Relayer {
origin_chains: HashSet<HyperlaneDomain>,
destination_chains: HashSet<HyperlaneDomain>,
destination_chains: HashMap<HyperlaneDomain, ChainConf>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
Expand All @@ -67,6 +71,8 @@ pub struct Relayer {
transaction_gas_limit: Option<U256>,
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
}

impl Debug for Relayer {
Expand All @@ -92,11 +98,15 @@ impl BaseAgent for Relayer {

type Settings = RelayerSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
let core = settings.build_hyperlane_core(metrics.clone());
let core = settings.build_hyperlane_core(core_metrics.clone());
let db = DB::from_path(&settings.db)?;
let dbs = settings
.origin_chains
Expand All @@ -105,18 +115,18 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();

let mailboxes = settings
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.build_mailboxes(settings.destination_chains.iter(), &core_metrics)
.await?;
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.build_validator_announces(settings.origin_chains.iter(), &core_metrics)
.await?;

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -126,7 +136,7 @@ impl BaseAgent for Relayer {
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -136,7 +146,7 @@ impl BaseAgent for Relayer {
let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand Down Expand Up @@ -188,9 +198,10 @@ impl BaseAgent for Relayer {
.collect();

let mut msg_ctxs = HashMap::new();
let mut destination_chains = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();

destination_chains.insert(destination.clone(), destination_chain_setup.clone());
let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
None
Expand Down Expand Up @@ -221,7 +232,7 @@ impl BaseAgent for Relayer {
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&metrics, origin, destination),
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),
}),
);
}
Expand All @@ -230,7 +241,7 @@ impl BaseAgent for Relayer {
Ok(Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
destination_chains,
msg_ctxs,
core,
message_syncs,
Expand All @@ -242,6 +253,8 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
})
}

Expand All @@ -251,12 +264,32 @@ impl BaseAgent for Relayer {

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for destination in &self.destination_chains {
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) =
mpsc::unbounded_channel::<Box<DynPendingOperation>>();
send_channels.insert(destination.id(), send_channel);
send_channels.insert(dest_domain.id(), send_channel);

tasks.push(self.run_destination_submitter(dest_domain, receive_channel));

tasks.push(self.run_destination_submitter(destination, receive_channel));
let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let agent_metrics = AgentMetricsUpdater::new(
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetrics"));
tasks.push(fetcher_task);
}

for origin in &self.origin_chains {
Expand Down Expand Up @@ -330,11 +363,11 @@ impl Relayer {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
origin,
self.destination_chains.iter(),
self.destination_chains.keys(),
);
let destination_ctxs = self
.destination_chains
.iter()
.keys()
.filter(|&destination| destination != origin)
.map(|destination| {
(
Expand Down
8 changes: 6 additions & 2 deletions rust/agents/scraper/migration/bin/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::env;
use std::{env, time::Duration};

use migration::sea_orm::{Database, DatabaseConnection};
pub use migration::{DbErr, Migrator, MigratorTrait as _};
use sea_orm::ConnectOptions;

const LOCAL_DATABASE_URL: &str = "postgresql://postgres:47221c18c610@localhost:5432/postgres";
const CONNECT_TIMEOUT: u64 = 20;

pub fn url() -> String {
env::var("DATABASE_URL").unwrap_or_else(|_| LOCAL_DATABASE_URL.into())
Expand All @@ -16,6 +18,8 @@ pub async fn init() -> Result<DatabaseConnection, DbErr> {
.init();

let url = url();
let mut options: ConnectOptions = url.clone().into();
options.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT));
println!("Connecting to {url}");
Database::connect(url).await
Database::connect(options).await
}
5 changes: 3 additions & 2 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -38,6 +38,7 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> eyre::Result<Self>
where
Self: Sized,
Expand Down
7 changes: 6 additions & 1 deletion rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
Expand Down Expand Up @@ -51,7 +52,11 @@ impl BaseAgent for Validator {

type Settings = ValidatorSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
Expand Down
15 changes: 10 additions & 5 deletions rust/chains/hyperlane-cosmos/src/aggregation_ism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use crate::{
address::CosmosAddress,
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::aggregate_ism::{ModulesAndThresholdRequest, ModulesAndThresholdResponse},
ConnectionConf, CosmosProvider, Signer,
};
Expand All @@ -18,7 +18,7 @@ use tracing::instrument;
pub struct CosmosAggregationIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: Box<CosmosProvider>,
}

impl CosmosAggregationIsm {
Expand All @@ -28,7 +28,12 @@ impl CosmosAggregationIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
Expand All @@ -50,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
self.provider.clone()
}
}

Expand All @@ -63,7 +68,7 @@ impl AggregationIsm for CosmosAggregationIsm {
) -> ChainResult<(Vec<H256>, u8)> {
let payload = ModulesAndThresholdRequest::new(message);

let data = self.provider.wasm_query(payload, None).await?;
let data = self.provider.grpc().wasm_query(payload, None).await?;
let response: ModulesAndThresholdResponse = serde_json::from_slice(&data)?;

let modules: ChainResult<Vec<H256>> = response
Expand Down
Loading

0 comments on commit 8b05186

Please sign in to comment.