Skip to content

Commit

Permalink
fix: checkpoints getter and crash due to the usage of default MemTrac…
Browse files Browse the repository at this point in the history
…ker in MemoryDB (#126)
  • Loading branch information
gshep authored Sep 11, 2024
1 parent 1ac623d commit 26cc301
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 69 deletions.
5 changes: 3 additions & 2 deletions ethereum-common/src/memory_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{keccak_hasher::KeccakHasher, *};
use ::memory_db::HashKey;
use ::memory_db::{HashKey, NoopTracker};

pub type MemoryDB = ::memory_db::MemoryDB<KeccakHasher, HashKey<KeccakHasher>, Vec<u8>>;
pub type MemoryDB =
::memory_db::MemoryDB<KeccakHasher, HashKey<KeccakHasher>, Vec<u8>, NoopTracker<Vec<u8>>>;

pub fn new() -> MemoryDB {
memory_db::MemoryDB::from_null_node(&rlp::NULL_RLP, rlp::NULL_RLP.as_ref().into())
Expand Down
101 changes: 74 additions & 27 deletions gear-programs/checkpoint-light-client/src/wasm/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,37 +125,26 @@ impl<const N: usize> Checkpoints<N> {

Err(0) => Err(CheckpointError::OutDated),

Err(index) if index < self.slots.len() => {
Err(index) => {
let (index_previous, slot_previous) = self.slots[index - 1];
let (index_next, slot_next) = self.slots[index];

let gap = match (slot_next - slot_previous) % SLOTS_PER_EPOCH {
// both slots are divisable by SLOTS_PER_EPOCH and the distance
// between them is greater than SLOTS_PER_EPOCH
0 if slot_previous + SLOTS_PER_EPOCH < slot_next => true,
_ => false,
};

let offset = ((slot - 1 - slot_previous) / SLOTS_PER_EPOCH + 1) as usize;
let slot_checkpoint = slot_previous + offset as u64 * SLOTS_PER_EPOCH;
if slot_previous % SLOTS_PER_EPOCH != 0
|| slot_next < slot_checkpoint
|| slot_next > slot_checkpoint + SLOTS_PER_EPOCH
|| gap
{
Ok((slot_next, self.checkpoints[index_next]))
} else {
Ok((slot_checkpoint, self.checkpoints[index_previous + offset]))
let maybe_next = self.slots.get(index);

let count = maybe_next
.map(|(index_next, _slot)| *index_next)
.unwrap_or(self.checkpoints.len())
- index_previous;
for i in 1..count {
let slot_next = slot_previous + i as u64 * SLOTS_PER_EPOCH;
if slot <= slot_next {
return Ok((slot_next, self.checkpoints[index_previous + i]));
}
}
}

_ => {
let offset = ((slot - 1 - slot_last) / SLOTS_PER_EPOCH + 1) as usize;
let slot_checkpoint = slot_last + offset as u64 * SLOTS_PER_EPOCH;
let index = index_last + offset;
match self.checkpoints.get(index) {
Some(checkpoint) => Ok((slot_checkpoint, *checkpoint)),
match maybe_next {
None => Err(CheckpointError::NotPresent),
Some((index_next, slot_next)) => {
Ok((*slot_next, self.checkpoints[*index_next]))
}
}
}
}
Expand Down Expand Up @@ -423,3 +412,61 @@ fn checkpoints_with_gaps() {

compare_checkpoints(&data, &checkpoints);
}

#[test]
fn checkpoints_get() {
use hex_literal::hex;

const COUNT: usize = 7;

// Holesky
let data = [
(
2_498_432,
hex!("192cbc312720ee203ed023837c7dd7783db6cee1f1b9d57411f348e8a143a308").into(),
),
(
2_498_464,
hex!("b89c6d200193f865b85a3f323b75d2b10346564a330229d8a5c695968206faf1").into(),
),
(
2_498_496,
hex!("4185e76eb0865e9ae5f8ea7601407261d1db9e66ba10818ebe717976d9bf201c").into(),
),
(
2_498_527,
hex!("e722020546e89a17228aa9365e5418aaf09d9c31b014a0b4df911a54702ccd57").into(),
),
(
2_498_560,
hex!("b50cd206a8ba4019baad810bbcd4fe1871be4944ea9cb06e15259376e996afde").into(),
),
(
2_498_592,
hex!("844300ded738bdad37cc202ad4ade0cc79f0e4aa311e8fee5668cb20341c52aa").into(),
),
(
2_498_624,
hex!("aca973372ac65cd5203e1521ba941bbbf836c5e591a9b459ca061c79a5740023").into(),
),
];
assert_eq!(data.len(), COUNT);

let mut checkpoints = Checkpoints::<COUNT>::new();

for (slot, checkpoint) in &data {
checkpoints.push(*slot, *checkpoint);
}

assert!(checkpoints.checkpoint(2_498_625).is_err());

for i in 1..data.len() {
let (slot_previous, _checkpoint) = data[i - 1];
let (expected_slot, expected_checkpoint) = data[i];
for slot in (1 + slot_previous)..=expected_slot {
let (actual_slot, actual_checkpoint) = checkpoints.checkpoint(slot).unwrap();
assert_eq!(actual_slot, expected_slot, "slot = {slot}");
assert_eq!(actual_checkpoint, expected_checkpoint);
}
}
}
43 changes: 21 additions & 22 deletions gear-programs/checkpoint-light-client/src/wasm/sync_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@ use committee::{Error as CommitteeError, Update as CommitteeUpdate};
use gstd::debug;

pub async fn handle(state: &mut State<STORED_CHECKPOINTS_COUNT>, sync_update: SyncCommitteeUpdate) {
if eth_utils::calculate_epoch(state.finalized_header.slot) + io::sync_update::MAX_EPOCHS_GAP
<= eth_utils::calculate_epoch(sync_update.finalized_header.slot)
{
let result = HandleResult::SyncUpdate(Err(io::sync_update::Error::ReplayBackRequired {
replay_back: state
.replay_back
.as_ref()
.map(|replay_back| meta::ReplayBack {
finalized_header: replay_back.finalized_header.slot,
last_header: replay_back.last_header.slot,
}),
checkpoint: state
.checkpoints
.last()
.expect("The program should be initialized so there is a checkpoint"),
}));
msg::reply(result, 0).expect("Unable to reply with `HandleResult::SyncUpdate::Error`");

return;
}

let (finalized_header_update, committee_update) = match verify(
&state.network,
&state.finalized_header,
Expand All @@ -23,28 +44,6 @@ pub async fn handle(state: &mut State<STORED_CHECKPOINTS_COUNT>, sync_update: Sy
};

if let Some(finalized_header) = finalized_header_update {
if eth_utils::calculate_epoch(state.finalized_header.slot) + io::sync_update::MAX_EPOCHS_GAP
<= eth_utils::calculate_epoch(finalized_header.slot)
{
let result =
HandleResult::SyncUpdate(Err(io::sync_update::Error::ReplayBackRequired {
replay_back: state
.replay_back
.as_ref()
.map(|replay_back| meta::ReplayBack {
finalized_header: replay_back.finalized_header.slot,
last_header: replay_back.last_header.slot,
}),
checkpoint: state
.checkpoints
.last()
.expect("The program should be initialized so there is a checkpoint"),
}));
msg::reply(result, 0).expect("Unable to reply with `HandleResult::SyncUpdate::Error`");

return;
}

state
.checkpoints
.push(finalized_header.slot, finalized_header.tree_hash_root());
Expand Down
9 changes: 7 additions & 2 deletions relayer/src/ethereum_checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
};
use gclient::{EventProcessor, GearApi, WSAddress};
use parity_scale_codec::Decode;
use reqwest::Client;
use reqwest::{Client, ClientBuilder};
use tokio::{
signal::unix::{self, SignalKind},
sync::mpsc::{self, Sender},
Expand Down Expand Up @@ -41,6 +41,7 @@ pub async fn relay(args: RelayCheckpointsArgs) {
let RelayCheckpointsArgs {
program_id,
beacon_endpoint,
beacon_timeout,
vara_domain,
vara_port,
vara_suri,
Expand All @@ -59,10 +60,14 @@ pub async fn relay(args: RelayCheckpointsArgs) {
.and_then(|bytes| <[u8; 32]>::try_from(bytes).ok())
.expect("Expecting correct ProgramId");

let client_http = ClientBuilder::new()
.timeout(Duration::from_secs(beacon_timeout))
.build()
.expect("Reqwest client should be created");

let mut signal_interrupt = unix::signal(SignalKind::interrupt()).expect("Set SIGINT handler");

let (sender, mut receiver) = mpsc::channel(SIZE_CHANNEL);
let client_http = Client::new();

sync_update::spawn_receiver(client_http.clone(), beacon_endpoint.clone(), sender);

Expand Down
1 change: 1 addition & 0 deletions relayer/src/ethereum_checkpoints/replay_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async fn replay_back_slots(
slots_batch_iter: SlotsBatchIter,
) -> AnyResult<()> {
for (slot_start, slot_end) in slots_batch_iter {
log::debug!("slot_start = {slot_start}, slot_end = {slot_end}");
replay_back_slots_inner(
client_http,
beacon_endpoint,
Expand Down
40 changes: 25 additions & 15 deletions relayer/src/ethereum_checkpoints/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::utils::{self, slots_batch, FinalityUpdateResponse};
use super::utils::{self, slots_batch, BootstrapResponse, FinalityUpdateResponse, UpdateData};
use checkpoint_light_client::WASM_BINARY;
use checkpoint_light_client_io::{
ethereum_common::{
Expand All @@ -19,6 +19,8 @@ const FINALITY_UPDATE_5_254_112: &[u8; 4_940] =
include_bytes!("./sepolia-finality-update-5_254_112.json");
const FINALITY_UPDATE_5_263_072: &[u8; 4_941] =
include_bytes!("./sepolia-finality-update-5_263_072.json");
const UPDATE_640: &[u8; 57_202] = include_bytes!("./sepolia-update-640.json");
const BOOTSTRAP_640: &[u8; 54_328] = include_bytes!("./sepolia-bootstrap-640.json");

async fn common_upload_program(
client: &GearApi,
Expand Down Expand Up @@ -65,12 +67,13 @@ async fn init(network: Network) -> Result<()> {

// use the latest finality header as a checkpoint for bootstrapping
let finality_update = utils::get_finality_update(&client_http, RPC_URL).await?;
let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot);
let slot = finality_update.finalized_header.slot;
let current_period = eth_utils::calculate_period(slot);
let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?;

println!(
"finality_update slot = {}, period = {}",
finality_update.finalized_header.slot, current_period
slot, current_period
);

let update = match updates.pop() {
Expand Down Expand Up @@ -419,11 +422,8 @@ async fn replaying_back() -> Result<()> {
Ok(())
}

#[ignore]
#[tokio::test]
async fn sync_update_requires_replaying_back() -> Result<()> {
let client_http = Client::new();

let finality_update: FinalityUpdateResponse =
serde_json::from_slice(FINALITY_UPDATE_5_263_072).unwrap();
let finality_update = finality_update.data;
Expand All @@ -433,18 +433,25 @@ async fn sync_update_requires_replaying_back() -> Result<()> {
);

let slot = finality_update.finalized_header.slot;
let current_period = eth_utils::calculate_period(slot);
let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?;
let mut updates: Vec<UpdateData> = serde_json::from_slice(UPDATE_640).unwrap();

let update = match updates.pop() {
Some(update) if updates.is_empty() => update.data,
_ => unreachable!("Requested single update"),
};

let checkpoint = update.finalized_header.tree_hash_root();
let checkpoint_hex = hex::encode(checkpoint);
let BootstrapResponse { data: bootstrap } = serde_json::from_slice(BOOTSTRAP_640).unwrap();

let checkpoint_update = update.finalized_header.tree_hash_root();
let checkpoint_bootstrap = bootstrap.header.tree_hash_root();
assert_eq!(
checkpoint_update,
checkpoint_bootstrap,
"checkpoint_update = {}, checkpoint_bootstrap = {}",
hex::encode(checkpoint_update),
hex::encode(checkpoint_bootstrap)
);

let bootstrap = utils::get_bootstrap(&client_http, RPC_URL, &checkpoint_hex).await?;
let signature = <G2 as ark_serialize::CanonicalDeserialize>::deserialize_compressed(
&update.sync_aggregate.sync_committee_signature.0 .0[..],
)
Expand Down Expand Up @@ -497,10 +504,13 @@ async fn sync_update_requires_replaying_back() -> Result<()> {

let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?;
let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap();
assert!(matches!(
result_decoded,
HandleResult::SyncUpdate(Err(sync_update::Error::ReplayBackRequired { .. }))
));
assert!(
matches!(
result_decoded,
HandleResult::SyncUpdate(Err(sync_update::Error::ReplayBackRequired { .. }))
),
"result_decoded = {result_decoded:?}"
);

Ok(())
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,14 @@ struct RelayCheckpointsArgs {
#[arg(long, env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS")]
program_id: String,

/// Specify an endpoint providing Beacon API
/// Specify the endpoint providing Beacon API
#[arg(long, env = "BEACON_ENDPOINT")]
beacon_endpoint: String,

/// Specify the timeout in seconds for requests to the Beacon API endpoint
#[arg(long, default_value = "120", env = "BEACON_TIMEOUT")]
beacon_timeout: u64,

/// Domain of the VARA RPC endpoint
#[arg(long, default_value = "ws://127.0.0.1", env = "VARA_DOMAIN")]
vara_domain: String,
Expand Down

0 comments on commit 26cc301

Please sign in to comment.