diff --git a/Cargo.lock b/Cargo.lock index 9f1111d9..66bd9a7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1971,26 +1971,17 @@ dependencies = [ name = "checkpoint_light_client" version = "0.1.0" dependencies = [ - "anyhow", - "ark-bls12-381", "ark-serialize 0.4.2", "checkpoint_light_client-io", "circular-buffer", - "futures", "gbuiltin-bls381", - "gclient", "gear-wasm-builder", "getrandom 0.2.15", "gstd", - "hex", "hex-literal", "lazy_static", "parity-scale-codec", "primitive-types 0.12.2", - "reqwest 0.11.27", - "serde", - "serde_json", - "tokio", "tree_hash_derive", ] diff --git a/gear-programs/checkpoint-light-client/Cargo.toml b/gear-programs/checkpoint-light-client/Cargo.toml index d95c579c..932d5eab 100644 --- a/gear-programs/checkpoint-light-client/Cargo.toml +++ b/gear-programs/checkpoint-light-client/Cargo.toml @@ -18,19 +18,8 @@ tree_hash_derive.workspace = true gear-wasm-builder.workspace = true checkpoint_light_client-io.workspace = true -[dev-dependencies] -gclient.workspace = true -ark-bls12-381 = { workspace = true, features = ["std"] } -serde = { workspace = true, features = ["std"] } -futures.workspace = true -tokio.workspace = true -hex = { workspace = true, features = ["std"] } -reqwest.workspace = true -serde_json.workspace = true -anyhow.workspace = true - [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] -getrandom = { version = "0.2", default-features = false, features = ["js"] } +getrandom = { version = "0.2", default-features = false, features = ["custom"] } lazy_static = { version = "1.1", features = ["spin_no_std"] } [features] diff --git a/relayer/src/ethereum_checkpoints/mod.rs b/relayer/src/ethereum_checkpoints/mod.rs index 01c10620..09cd2de3 100644 --- a/relayer/src/ethereum_checkpoints/mod.rs +++ b/relayer/src/ethereum_checkpoints/mod.rs @@ -2,6 +2,7 @@ use super::*; use anyhow::{anyhow, Result as AnyResult}; use checkpoint_light_client_io::{ ethereum_common::{utils as eth_utils, SLOTS_PER_EPOCH}, + meta::ReplayBack, tree_hash::Hash256, Handle, HandleResult, Slot, SyncCommitteeUpdate, G2, }; @@ -9,7 +10,7 @@ use futures::{ future::{self, Either}, pin_mut, }; -use gclient::{EventListener, EventProcessor, GearApi, WSAddress}; +use gclient::{EventProcessor, GearApi, WSAddress}; use parity_scale_codec::Decode; use reqwest::Client; use tokio::{ @@ -75,11 +76,6 @@ pub async fn relay(args: RelayCheckpointsArgs) { let gas_limit = gas_limit_block / 100 * 95; log::info!("Gas limit for extrinsics: {gas_limit}"); - let mut listener = client - .subscribe() - .await - .expect("Events listener should be created"); - let sync_update = receiver .recv() .await @@ -87,15 +83,7 @@ pub async fn relay(args: RelayCheckpointsArgs) { let mut slot_last = sync_update.finalized_header.slot; - match sync_update::try_to_apply( - &client, - &mut listener, - program_id, - sync_update.clone(), - gas_limit, - ) - .await - { + match sync_update::try_to_apply(&client, program_id, sync_update.clone(), gas_limit).await { Err(e) => { log::error!("{e:?}"); return; @@ -108,21 +96,17 @@ pub async fn relay(args: RelayCheckpointsArgs) { &client_http, &beacon_endpoint, &client, - &mut listener, program_id, gas_limit, - replay_back.map(|r| r.last_header), + replay_back, checkpoint, sync_update, ) .await { - log::error!("{e:?}"); + log::error!("{e:?}. Exiting"); + return; } - - log::info!("Exiting"); - - return; } Ok(Ok(_) | Err(sync_update::Error::NotActual)) => (), _ => { @@ -171,9 +155,7 @@ pub async fn relay(args: RelayCheckpointsArgs) { } let committee_update = sync_update.sync_committee_next_pub_keys.is_some(); - match sync_update::try_to_apply(&client, &mut listener, program_id, sync_update, gas_limit) - .await - { + match sync_update::try_to_apply(&client, program_id, sync_update, gas_limit).await { Ok(Ok(_)) => { slot_last = slot; @@ -186,10 +168,15 @@ pub async fn relay(args: RelayCheckpointsArgs) { } } Ok(Err(sync_update::Error::ReplayBackRequired { .. })) => { - log::info!("Replay back within the main loop. Exiting"); + log::error!("Replay back within the main loop. Exiting"); return; } - Ok(Err(e)) => log::info!("The program failed with: {e:?}. Skipping"), + Ok(Err(e)) => { + log::error!("The program failed with: {e:?}. Skipping"); + if let sync_update::Error::NotActual = e { + slot_last = slot; + } + } Err(e) => { log::error!("{e:?}"); return; diff --git a/relayer/src/ethereum_checkpoints/replay_back.rs b/relayer/src/ethereum_checkpoints/replay_back.rs index 3443c381..3c6b46e8 100644 --- a/relayer/src/ethereum_checkpoints/replay_back.rs +++ b/relayer/src/ethereum_checkpoints/replay_back.rs @@ -7,17 +7,20 @@ pub async fn execute( client_http: &Client, beacon_endpoint: &str, client: &GearApi, - listener: &mut EventListener, program_id: [u8; 32], gas_limit: u64, - replayed_slot: Option, + replay_back: Option, checkpoint: (Slot, Hash256), sync_update: SyncCommitteeUpdate, ) -> AnyResult<()> { log::info!("Replaying back started"); let (mut slot_start, _) = checkpoint; - if let Some(slot_end) = replayed_slot { + if let Some(ReplayBack { + finalized_header, + last_header: slot_end, + }) = replay_back + { let slots_batch_iter = SlotsBatchIter::new(slot_start, slot_end, SIZE_BATCH) .ok_or(anyhow!("Failed to create slots_batch::Iter with slot_start = {slot_start}, slot_end = {slot_end}."))?; @@ -25,16 +28,13 @@ pub async fn execute( client_http, beacon_endpoint, client, - listener, program_id, gas_limit, slots_batch_iter, ) .await?; - log::info!("The ongoing replaying back finished"); - - return Ok(()); + slot_start = finalized_header; } let period_start = 1 + eth_utils::calculate_period(slot_start); @@ -65,7 +65,6 @@ pub async fn execute( client_http, beacon_endpoint, client, - listener, program_id, gas_limit, slots_batch_iter.next(), @@ -77,7 +76,6 @@ pub async fn execute( client_http, beacon_endpoint, client, - listener, program_id, gas_limit, slots_batch_iter, @@ -97,7 +95,6 @@ pub async fn execute( client_http, beacon_endpoint, client, - listener, program_id, gas_limit, slots_batch_iter.next(), @@ -109,7 +106,6 @@ pub async fn execute( client_http, beacon_endpoint, client, - listener, program_id, gas_limit, slots_batch_iter, @@ -125,7 +121,6 @@ async fn replay_back_slots( client_http: &Client, beacon_endpoint: &str, client: &GearApi, - listener: &mut EventListener, program_id: [u8; 32], gas_limit: u64, slots_batch_iter: SlotsBatchIter, @@ -135,7 +130,6 @@ async fn replay_back_slots( client_http, beacon_endpoint, client, - listener, program_id, slot_start, slot_end, @@ -152,7 +146,6 @@ async fn replay_back_slots_inner( client_http: &Client, beacon_endpoint: &str, client: &GearApi, - listener: &mut EventListener, program_id: [u8; 32], slot_start: Slot, slot_end: Slot, @@ -162,6 +155,8 @@ async fn replay_back_slots_inner( request_headers(client_http, beacon_endpoint, slot_start, slot_end).await?, ); + let mut listener = client.subscribe().await?; + let (message_id, _) = client .send_message(program_id.into(), payload, gas_limit, 0) .await @@ -190,7 +185,6 @@ async fn replay_back_slots_start( client_http: &Client, beacon_endpoint: &str, client: &GearApi, - listener: &mut EventListener, program_id: [u8; 32], gas_limit: u64, slots: Option<(Slot, Slot)>, @@ -205,6 +199,8 @@ async fn replay_back_slots_start( headers: request_headers(client_http, beacon_endpoint, slot_start, slot_end).await?, }; + let mut listener = client.subscribe().await?; + let (message_id, _) = client .send_message(program_id.into(), payload, gas_limit, 0) .await diff --git a/relayer/src/ethereum_checkpoints/sync_update.rs b/relayer/src/ethereum_checkpoints/sync_update.rs index dad03014..d211f88b 100644 --- a/relayer/src/ethereum_checkpoints/sync_update.rs +++ b/relayer/src/ethereum_checkpoints/sync_update.rs @@ -75,11 +75,12 @@ async fn receive( pub async fn try_to_apply( client: &GearApi, - listener: &mut EventListener, program_id: [u8; 32], sync_update: SyncCommitteeUpdate, gas_limit: u64, ) -> AnyResult> { + let mut listener = client.subscribe().await?; + let payload = Handle::SyncUpdate(sync_update); let (message_id, _) = client .send_message(program_id.into(), payload, gas_limit, 0)