Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: checkpoints getter and crash due to the usage of default MemTracker in MemoryDB #126

Merged
merged 9 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ethereum-common/src/memory_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{keccak_hasher::KeccakHasher, *};
use ::memory_db::HashKey;
use ::memory_db::{HashKey, NoopTracker};

pub type MemoryDB = ::memory_db::MemoryDB<KeccakHasher, HashKey<KeccakHasher>, Vec<u8>>;
pub type MemoryDB =
::memory_db::MemoryDB<KeccakHasher, HashKey<KeccakHasher>, Vec<u8>, NoopTracker<Vec<u8>>>;

pub fn new() -> MemoryDB {
memory_db::MemoryDB::from_null_node(&rlp::NULL_RLP, rlp::NULL_RLP.as_ref().into())
Expand Down
101 changes: 74 additions & 27 deletions gear-programs/checkpoint-light-client/src/wasm/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,37 +125,26 @@ impl<const N: usize> Checkpoints<N> {

Err(0) => Err(CheckpointError::OutDated),

Err(index) if index < self.slots.len() => {
Err(index) => {
let (index_previous, slot_previous) = self.slots[index - 1];
let (index_next, slot_next) = self.slots[index];

let gap = match (slot_next - slot_previous) % SLOTS_PER_EPOCH {
// both slots are divisable by SLOTS_PER_EPOCH and the distance
// between them is greater than SLOTS_PER_EPOCH
0 if slot_previous + SLOTS_PER_EPOCH < slot_next => true,
_ => false,
};

let offset = ((slot - 1 - slot_previous) / SLOTS_PER_EPOCH + 1) as usize;
let slot_checkpoint = slot_previous + offset as u64 * SLOTS_PER_EPOCH;
if slot_previous % SLOTS_PER_EPOCH != 0
|| slot_next < slot_checkpoint
|| slot_next > slot_checkpoint + SLOTS_PER_EPOCH
|| gap
{
Ok((slot_next, self.checkpoints[index_next]))
} else {
Ok((slot_checkpoint, self.checkpoints[index_previous + offset]))
let maybe_next = self.slots.get(index);

let count = maybe_next
.map(|(index_next, _slot)| *index_next)
.unwrap_or(self.checkpoints.len())
- index_previous;
for i in 1..count {
let slot_next = slot_previous + i as u64 * SLOTS_PER_EPOCH;
if slot <= slot_next {
return Ok((slot_next, self.checkpoints[index_previous + i]));
}
}
}

_ => {
let offset = ((slot - 1 - slot_last) / SLOTS_PER_EPOCH + 1) as usize;
let slot_checkpoint = slot_last + offset as u64 * SLOTS_PER_EPOCH;
let index = index_last + offset;
match self.checkpoints.get(index) {
Some(checkpoint) => Ok((slot_checkpoint, *checkpoint)),
match maybe_next {
None => Err(CheckpointError::NotPresent),
Some((index_next, slot_next)) => {
Ok((*slot_next, self.checkpoints[*index_next]))
}
}
}
}
Expand Down Expand Up @@ -423,3 +412,61 @@ fn checkpoints_with_gaps() {

compare_checkpoints(&data, &checkpoints);
}

#[test]
fn checkpoints_get() {
use hex_literal::hex;

const COUNT: usize = 7;

// Holesky
let data = [
(
2_498_432,
hex!("192cbc312720ee203ed023837c7dd7783db6cee1f1b9d57411f348e8a143a308").into(),
),
(
2_498_464,
hex!("b89c6d200193f865b85a3f323b75d2b10346564a330229d8a5c695968206faf1").into(),
),
(
2_498_496,
hex!("4185e76eb0865e9ae5f8ea7601407261d1db9e66ba10818ebe717976d9bf201c").into(),
),
(
2_498_527,
hex!("e722020546e89a17228aa9365e5418aaf09d9c31b014a0b4df911a54702ccd57").into(),
),
(
2_498_560,
hex!("b50cd206a8ba4019baad810bbcd4fe1871be4944ea9cb06e15259376e996afde").into(),
),
(
2_498_592,
hex!("844300ded738bdad37cc202ad4ade0cc79f0e4aa311e8fee5668cb20341c52aa").into(),
),
(
2_498_624,
hex!("aca973372ac65cd5203e1521ba941bbbf836c5e591a9b459ca061c79a5740023").into(),
),
];
assert_eq!(data.len(), COUNT);

let mut checkpoints = Checkpoints::<COUNT>::new();

for (slot, checkpoint) in &data {
checkpoints.push(*slot, *checkpoint);
}

assert!(checkpoints.checkpoint(2_498_625).is_err());

for i in 1..data.len() {
let (slot_previous, _checkpoint) = data[i - 1];
let (expected_slot, expected_checkpoint) = data[i];
for slot in (1 + slot_previous)..=expected_slot {
let (actual_slot, actual_checkpoint) = checkpoints.checkpoint(slot).unwrap();
assert_eq!(actual_slot, expected_slot, "slot = {slot}");
assert_eq!(actual_checkpoint, expected_checkpoint);
}
}
}
43 changes: 21 additions & 22 deletions gear-programs/checkpoint-light-client/src/wasm/sync_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@ use committee::{Error as CommitteeError, Update as CommitteeUpdate};
use gstd::debug;

pub async fn handle(state: &mut State<STORED_CHECKPOINTS_COUNT>, sync_update: SyncCommitteeUpdate) {
if eth_utils::calculate_epoch(state.finalized_header.slot) + io::sync_update::MAX_EPOCHS_GAP
<= eth_utils::calculate_epoch(sync_update.finalized_header.slot)
{
let result = HandleResult::SyncUpdate(Err(io::sync_update::Error::ReplayBackRequired {
replay_back: state
.replay_back
.as_ref()
.map(|replay_back| meta::ReplayBack {
finalized_header: replay_back.finalized_header.slot,
last_header: replay_back.last_header.slot,
}),
checkpoint: state
.checkpoints
.last()
.expect("The program should be initialized so there is a checkpoint"),
}));
msg::reply(result, 0).expect("Unable to reply with `HandleResult::SyncUpdate::Error`");

return;
}

let (finalized_header_update, committee_update) = match verify(
&state.network,
&state.finalized_header,
Expand All @@ -23,28 +44,6 @@ pub async fn handle(state: &mut State<STORED_CHECKPOINTS_COUNT>, sync_update: Sy
};

if let Some(finalized_header) = finalized_header_update {
if eth_utils::calculate_epoch(state.finalized_header.slot) + io::sync_update::MAX_EPOCHS_GAP
<= eth_utils::calculate_epoch(finalized_header.slot)
{
let result =
HandleResult::SyncUpdate(Err(io::sync_update::Error::ReplayBackRequired {
replay_back: state
.replay_back
.as_ref()
.map(|replay_back| meta::ReplayBack {
finalized_header: replay_back.finalized_header.slot,
last_header: replay_back.last_header.slot,
}),
checkpoint: state
.checkpoints
.last()
.expect("The program should be initialized so there is a checkpoint"),
}));
msg::reply(result, 0).expect("Unable to reply with `HandleResult::SyncUpdate::Error`");

return;
}

state
.checkpoints
.push(finalized_header.slot, finalized_header.tree_hash_root());
Expand Down
9 changes: 7 additions & 2 deletions relayer/src/ethereum_checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
};
use gclient::{EventProcessor, GearApi, WSAddress};
use parity_scale_codec::Decode;
use reqwest::Client;
use reqwest::{Client, ClientBuilder};
use tokio::{
signal::unix::{self, SignalKind},
sync::mpsc::{self, Sender},
Expand Down Expand Up @@ -41,6 +41,7 @@ pub async fn relay(args: RelayCheckpointsArgs) {
let RelayCheckpointsArgs {
program_id,
beacon_endpoint,
beacon_timeout,
vara_domain,
vara_port,
vara_suri,
Expand All @@ -59,10 +60,14 @@ pub async fn relay(args: RelayCheckpointsArgs) {
.and_then(|bytes| <[u8; 32]>::try_from(bytes).ok())
.expect("Expecting correct ProgramId");

let client_http = ClientBuilder::new()
.timeout(Duration::from_secs(beacon_timeout))
.build()
.expect("Reqwest client should be created");

let mut signal_interrupt = unix::signal(SignalKind::interrupt()).expect("Set SIGINT handler");

let (sender, mut receiver) = mpsc::channel(SIZE_CHANNEL);
let client_http = Client::new();

sync_update::spawn_receiver(client_http.clone(), beacon_endpoint.clone(), sender);

Expand Down
1 change: 1 addition & 0 deletions relayer/src/ethereum_checkpoints/replay_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async fn replay_back_slots(
slots_batch_iter: SlotsBatchIter,
) -> AnyResult<()> {
for (slot_start, slot_end) in slots_batch_iter {
log::debug!("slot_start = {slot_start}, slot_end = {slot_end}");
replay_back_slots_inner(
client_http,
beacon_endpoint,
Expand Down
40 changes: 25 additions & 15 deletions relayer/src/ethereum_checkpoints/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::utils::{self, slots_batch, FinalityUpdateResponse};
use super::utils::{self, slots_batch, BootstrapResponse, FinalityUpdateResponse, UpdateData};
use checkpoint_light_client::WASM_BINARY;
use checkpoint_light_client_io::{
ethereum_common::{
Expand All @@ -19,6 +19,8 @@ const FINALITY_UPDATE_5_254_112: &[u8; 4_940] =
include_bytes!("./sepolia-finality-update-5_254_112.json");
const FINALITY_UPDATE_5_263_072: &[u8; 4_941] =
include_bytes!("./sepolia-finality-update-5_263_072.json");
const UPDATE_640: &[u8; 57_202] = include_bytes!("./sepolia-update-640.json");
const BOOTSTRAP_640: &[u8; 54_328] = include_bytes!("./sepolia-bootstrap-640.json");

async fn common_upload_program(
client: &GearApi,
Expand Down Expand Up @@ -65,12 +67,13 @@ async fn init(network: Network) -> Result<()> {

// use the latest finality header as a checkpoint for bootstrapping
let finality_update = utils::get_finality_update(&client_http, RPC_URL).await?;
let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot);
let slot = finality_update.finalized_header.slot;
let current_period = eth_utils::calculate_period(slot);
let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?;

println!(
"finality_update slot = {}, period = {}",
finality_update.finalized_header.slot, current_period
slot, current_period
);

let update = match updates.pop() {
Expand Down Expand Up @@ -419,11 +422,8 @@ async fn replaying_back() -> Result<()> {
Ok(())
}

#[ignore]
#[tokio::test]
async fn sync_update_requires_replaying_back() -> Result<()> {
let client_http = Client::new();

let finality_update: FinalityUpdateResponse =
serde_json::from_slice(FINALITY_UPDATE_5_263_072).unwrap();
let finality_update = finality_update.data;
Expand All @@ -433,18 +433,25 @@ async fn sync_update_requires_replaying_back() -> Result<()> {
);

let slot = finality_update.finalized_header.slot;
let current_period = eth_utils::calculate_period(slot);
let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?;
let mut updates: Vec<UpdateData> = serde_json::from_slice(UPDATE_640).unwrap();

let update = match updates.pop() {
Some(update) if updates.is_empty() => update.data,
_ => unreachable!("Requested single update"),
};

let checkpoint = update.finalized_header.tree_hash_root();
let checkpoint_hex = hex::encode(checkpoint);
let BootstrapResponse { data: bootstrap } = serde_json::from_slice(BOOTSTRAP_640).unwrap();

let checkpoint_update = update.finalized_header.tree_hash_root();
let checkpoint_bootstrap = bootstrap.header.tree_hash_root();
assert_eq!(
checkpoint_update,
checkpoint_bootstrap,
"checkpoint_update = {}, checkpoint_bootstrap = {}",
hex::encode(checkpoint_update),
hex::encode(checkpoint_bootstrap)
);

let bootstrap = utils::get_bootstrap(&client_http, RPC_URL, &checkpoint_hex).await?;
let signature = <G2 as ark_serialize::CanonicalDeserialize>::deserialize_compressed(
&update.sync_aggregate.sync_committee_signature.0 .0[..],
)
Expand Down Expand Up @@ -497,10 +504,13 @@ async fn sync_update_requires_replaying_back() -> Result<()> {

let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?;
let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap();
assert!(matches!(
result_decoded,
HandleResult::SyncUpdate(Err(sync_update::Error::ReplayBackRequired { .. }))
));
assert!(
matches!(
result_decoded,
HandleResult::SyncUpdate(Err(sync_update::Error::ReplayBackRequired { .. }))
),
"result_decoded = {result_decoded:?}"
);

Ok(())
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,14 @@ struct RelayCheckpointsArgs {
#[arg(long, env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS")]
program_id: String,

/// Specify an endpoint providing Beacon API
/// Specify the endpoint providing Beacon API
#[arg(long, env = "BEACON_ENDPOINT")]
beacon_endpoint: String,

/// Specify the timeout in seconds for requests to the Beacon API endpoint
#[arg(long, default_value = "120", env = "BEACON_TIMEOUT")]
beacon_timeout: u64,

/// Domain of the VARA RPC endpoint
#[arg(long, default_value = "ws://127.0.0.1", env = "VARA_DOMAIN")]
vara_domain: String,
Expand Down
Loading