-
Notifications
You must be signed in to change notification settings - Fork 153
feat(era_manager): add era support + pre-fetching #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4f581fd
2ce7d59
a67027d
66fa50f
350994e
26d145d
5a99d28
e478b71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,12 +49,28 @@ impl Era { | |
let slot_index_state = SlotIndexStateEntry::try_from(&file.entries[entries_length - 1])?; | ||
|
||
// Iterate over the block entries. Skip the first and last 3 entries. | ||
let mut next_slot = slot_index_block.slot_index.starting_slot; | ||
for idx in 1..entries_length - 3 { | ||
let entry = &file.entries[idx]; | ||
let fork = get_beacon_fork(next_slot); | ||
let slot_indexes = slot_index_block | ||
.slot_index | ||
.indices | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(i, index)| { | ||
if *index != 0 { | ||
Some(slot_index_block.slot_index.starting_slot + i as u64) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect::<Vec<u64>>(); | ||
|
||
ensure!( | ||
slot_indexes.len() == entries_length - 4, | ||
"invalid slot index block: incorrect count" | ||
); | ||
for (index, slot) in slot_indexes.into_iter().enumerate() { | ||
let entry = &file.entries[index + 1]; | ||
let fork = get_beacon_fork(slot); | ||
let beacon_block = CompressedSignedBeaconBlock::try_from(entry, fork)?; | ||
next_slot = beacon_block.block.slot() + 1; | ||
blocks.push(beacon_block); | ||
} | ||
let fork = get_beacon_fork(slot_index_state.slot_index.starting_slot); | ||
|
@@ -80,6 +96,44 @@ impl Era { | |
Ok(era_state.state) | ||
} | ||
|
||
/// Iterate over beacon blocks. | ||
pub fn iter_blocks( | ||
raw_era: Vec<u8>, | ||
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<CompressedSignedBeaconBlock>>> { | ||
let file = E2StoreMemory::deserialize(&raw_era)?; | ||
let entries_length = file.entries.len(); | ||
let block_index = | ||
SlotIndexBlockEntry::try_from(&file.entries[entries_length - 2])?.slot_index; | ||
|
||
let slot_indexes = block_index | ||
|
||
.indices | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(i, index)| { | ||
if *index != 0 { | ||
Some(block_index.starting_slot + i as u64) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect::<Vec<u64>>(); | ||
|
||
ensure!( | ||
slot_indexes.len() == entries_length - 4, | ||
"invalid slot index block: incorrect count" | ||
); | ||
|
||
Ok(slot_indexes | ||
.into_iter() | ||
.enumerate() | ||
.map(move |(index, slot)| { | ||
let entry: Entry = file.entries[index + 1].clone(); | ||
let fork = get_beacon_fork(slot); | ||
let beacon_block = CompressedSignedBeaconBlock::try_from(&entry, fork)?; | ||
Ok(beacon_block) | ||
})) | ||
} | ||
|
||
#[allow(dead_code)] | ||
fn write(&self) -> anyhow::Result<Vec<u8>> { | ||
let mut entries: Vec<Entry> = vec![]; | ||
|
@@ -102,6 +156,17 @@ impl Era { | |
file.write(&mut buf)?; | ||
Ok(buf) | ||
} | ||
|
||
pub fn contains(&self, block_number: u64) -> bool { | ||
if self.blocks.is_empty() { | ||
return false; | ||
} | ||
let first_block_number = self.blocks[0].block.execution_block_number(); | ||
let last_block_number = self.blocks[self.blocks.len() - 1] | ||
.block | ||
.execution_block_number(); | ||
(first_block_number..=last_block_number).contains(&block_number) | ||
} | ||
} | ||
|
||
#[derive(Clone, PartialEq, Debug)] | ||
|
@@ -110,7 +175,7 @@ pub struct CompressedSignedBeaconBlock { | |
} | ||
|
||
impl CompressedSignedBeaconBlock { | ||
fn try_from(entry: &Entry, fork: ForkName) -> Result<Self, anyhow::Error> { | ||
pub fn try_from(entry: &Entry, fork: ForkName) -> Result<Self, anyhow::Error> { | ||
ensure!( | ||
entry.header.type_ == 0x01, | ||
"invalid compressed signed beacon block entry: incorrect header type" | ||
|
@@ -257,7 +322,7 @@ impl TryFrom<Entry> for SlotIndexBlock { | |
// slot-index := starting-slot | index | index | index ... | count | ||
#[derive(Clone, Eq, PartialEq, Debug)] | ||
pub struct SlotIndexStateEntry { | ||
slot_index: SlotIndexState, | ||
pub slot_index: SlotIndexState, | ||
} | ||
|
||
impl TryFrom<&Entry> for SlotIndexStateEntry { | ||
|
@@ -304,7 +369,7 @@ impl TryInto<Entry> for SlotIndexStateEntry { | |
|
||
#[derive(Clone, Eq, PartialEq, Debug)] | ||
pub struct SlotIndexState { | ||
starting_slot: u64, | ||
pub starting_slot: u64, | ||
indices: [u64; 1], | ||
count: u64, | ||
} | ||
|
@@ -326,7 +391,7 @@ impl TryFrom<Entry> for SlotIndexState { | |
} | ||
} | ||
|
||
fn get_beacon_fork(slot_index: u64) -> ForkName { | ||
pub fn get_beacon_fork(slot_index: u64) -> ForkName { | ||
if slot_index < 4_636_672 { | ||
panic!("e2store/era doesn't support this fork"); | ||
} else if (4_636_672..6_209_536).contains(&slot_index) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,12 @@ | ||
use std::io; | ||
use std::{collections::HashMap, io}; | ||
|
||
use anyhow::{anyhow, ensure, Error}; | ||
use rand::{seq::SliceRandom, thread_rng}; | ||
use scraper::{Html, Selector}; | ||
use surf::Client; | ||
|
||
const ERA1_DIR_URL: &str = "https://era1.ethportal.net/"; | ||
const ERA1_FILE_COUNT: usize = 1897; | ||
pub const ERA1_FILE_COUNT: usize = 1897; | ||
|
||
/// Fetches era1 files hosted on era1.ethportal.net and shuffles them | ||
pub async fn get_shuffled_era1_files(http_client: &Client) -> anyhow::Result<Vec<String>> { | ||
|
@@ -47,3 +47,54 @@ pub fn underlying_io_error_kind(error: &Error) -> Option<io::ErrorKind> { | |
} | ||
None | ||
} | ||
|
||
const ERA_DIR_URL: &str = "https://mainnet.era.nimbus.team/"; | ||
|
||
/// Fetches era file download links | ||
pub async fn get_era_file_download_links( | ||
|
||
http_client: &Client, | ||
) -> anyhow::Result<HashMap<u64, String>> { | ||
let index_html = http_client | ||
.get(ERA_DIR_URL) | ||
.recv_string() | ||
.await | ||
.map_err(|e| anyhow!("{e}"))?; | ||
let index_html = Html::parse_document(&index_html); | ||
let selector = Selector::parse("a[href*='mainnet-']").expect("to be able to parse selector"); | ||
let era_files: HashMap<u64, String> = index_html | ||
.select(&selector) | ||
.map(|element| { | ||
let href = element | ||
.value() | ||
.attr("href") | ||
.expect("to be able to get href"); | ||
let epoch_index = href | ||
.split('-') | ||
.nth(1) | ||
.expect("to be able to get epoch") | ||
.parse::<u64>() | ||
.expect("to be able to parse epoch"); | ||
(epoch_index, format!("{ERA_DIR_URL}{href}")) | ||
}) | ||
.collect(); | ||
Ok(era_files) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: what do you think about checking that keys are starting from 0 and consecutive? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure what you mean do you want me to loop from 0 to max era count and see if all exist? Because since we are using a hashmap, the the values are stored in the positions of the hash of the keys which wouldn't be in sorted order, if we were to iterate over the buckets in order There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think loop and checking is simple enough. Something like: ensure!(
(0..(era_files.len()).all(|epoch| era_files.contains(&epoch)),
"Epoch indices are not starting from zero or not consecutive",
); You can also calculate min and max and check if they are as desired (this still requires looping, so no saving performance wise). |
||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[tokio::test] | ||
async fn test_get_shuffled_era1_files() { | ||
let http_client = Client::new(); | ||
let era1_files = get_shuffled_era1_files(&http_client).await.unwrap(); | ||
assert_eq!(era1_files.len(), ERA1_FILE_COUNT); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_get_era_file_download_links() { | ||
let http_client = Client::new(); | ||
let era_files = get_era_file_download_links(&http_client).await.unwrap(); | ||
assert!(!era_files.is_empty()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,6 +174,17 @@ impl SignedBeaconBlock { | |
SignedBeaconBlock::Deneb(block) => block.message.slot, | ||
} | ||
} | ||
|
||
/// Returns execution block number. | ||
pub fn execution_block_number(&self) -> u64 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it feels a bit weird to be this specific and return the execution block number directly from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would make us do a match on match executionpayload { I think the current implementation is the cleanest, but if this is a major concern we can discuss it more |
||
match self { | ||
SignedBeaconBlock::Bellatrix(block) => { | ||
block.message.body.execution_payload.block_number | ||
} | ||
SignedBeaconBlock::Capella(block) => block.message.body.execution_payload.block_number, | ||
SignedBeaconBlock::Deneb(block) => block.message.body.execution_payload.block_number, | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this comment is no longer relevant. if you want, put it down close to
ensure!
to explain why there is- 4
, but rephrase it in that case