-
Notifications
You must be signed in to change notification settings - Fork 153
feat(era_manager): add era support + pre-fetching #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4f581fd
2ce7d59
a67027d
66fa50f
350994e
26d145d
5a99d28
e478b71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
use std::collections::HashMap; | ||
|
||
use e2store::{ | ||
era1::Era1, | ||
utils::{get_era_files, get_shuffled_era1_files, ERA1_FILE_COUNT}, | ||
utils::{get_era1_files, get_era_files, ERA1_FILE_COUNT}, | ||
}; | ||
use surf::{Client, Config}; | ||
use tokio::task::JoinHandle; | ||
|
@@ -17,81 +19,64 @@ use super::{ | |
}; | ||
|
||
pub struct EraManager { | ||
current_block_number: u64, | ||
next_block_number: u64, | ||
current_era: Option<ProcessedEra>, | ||
next_era: Option<JoinHandle<ProcessedEra>>, | ||
next_era: Option<JoinHandle<anyhow::Result<ProcessedEra>>>, | ||
http_client: Client, | ||
era1_files: Vec<String>, | ||
era1_files: HashMap<u64, String>, | ||
era_files: HashMap<u64, String>, | ||
} | ||
|
||
impl EraManager { | ||
pub async fn new(starting_block: u64) -> anyhow::Result<Self> { | ||
pub async fn new(next_block_number: u64) -> anyhow::Result<Self> { | ||
let http_client: Client = Config::new() | ||
.add_header("Content-Type", "application/xml") | ||
.expect("to be able to add header") | ||
.try_into()?; | ||
let era1_files = get_shuffled_era1_files(&http_client).await?; | ||
let era1_files = get_era1_files(&http_client).await?; | ||
let era_files = get_era_files(&http_client).await?; | ||
|
||
let mut era_manager = Self { | ||
current_block_number: starting_block, | ||
next_block_number, | ||
current_era: None, | ||
next_era: None, | ||
http_client, | ||
era1_files, | ||
era_files, | ||
}; | ||
|
||
// initialize the current era | ||
let current_era = era_manager.fetch_era_file().await?; | ||
let _ = era_manager.current_era.insert(current_era); | ||
// initialize the next era file | ||
// The first time using era_manager we need to find the current era file we are starting | ||
// from If the block is from a era file we will need to binary search to find which | ||
// era file contains the block we want | ||
let era1_files = era_manager.era1_files.clone(); | ||
let http_client = era_manager.http_client.clone(); | ||
let join_handle = tokio::spawn(async move { | ||
Self::init_current_era(era1_files, http_client, next_block_number).await | ||
|
||
}); | ||
era_manager.next_era = Some(join_handle); | ||
|
||
Ok(era_manager) | ||
} | ||
|
||
pub fn current_block_number(&self) -> u64 { | ||
self.current_block_number | ||
} | ||
|
||
pub async fn get_current_block(&mut self) -> anyhow::Result<&ProcessedBlock> { | ||
let Some(current_era) = &self.current_era else { | ||
panic!("current_era should be initialized in EraManager::new"); | ||
}; | ||
Ok(current_era.get_block(self.current_block_number)) | ||
pub fn next_block_number(&self) -> u64 { | ||
self.next_block_number | ||
} | ||
|
||
pub async fn get_next_block(&mut self) -> anyhow::Result<&ProcessedBlock> { | ||
self.current_block_number += 1; | ||
let processed_era = match &self.current_era { | ||
Some(processed_era) if processed_era.contains_block(self.current_block_number) => { | ||
Some(processed_era) if processed_era.contains_block(self.next_block_number) => { | ||
self.current_era.as_ref().expect("current_era to be some") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: isn't this the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a little confused what this is implying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, can't this be simplified to: let processed_era = match &self.current_era {
Some(processed_era) if processed_era.contains_block(block_number) => {
processed_era
}
...
}; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried this but it gets mad at me saying *self is used, but then mut self is used |
||
} | ||
_ => { | ||
let current_era = self.fetch_era_file().await?; | ||
self.current_era.insert(current_era) | ||
} | ||
}; | ||
Ok(processed_era.get_block(self.current_block_number)) | ||
} | ||
let block = processed_era.get_block(self.next_block_number); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: It should, but I think we can't be sure that It's better to check than panic (in my opinion), so I would add |
||
self.next_block_number += 1; | ||
|
||
async fn fetch_era_file_link( | ||
&self, | ||
block_number: u64, | ||
epoch_index: u64, | ||
) -> anyhow::Result<String> { | ||
match EraType::for_block_number(block_number) { | ||
EraType::Era1 => { | ||
let era1_file = self | ||
.era1_files | ||
.iter() | ||
.find(|file| file.contains(&format!("mainnet-{epoch_index:05}-"))) | ||
.expect("to be able to find era file") | ||
.clone(); | ||
Ok(era1_file) | ||
} | ||
EraType::Era => { | ||
let era_files = get_era_files(&self.http_client).await?; | ||
Ok(era_files[&(epoch_index)].clone()) | ||
} | ||
} | ||
Ok(block) | ||
} | ||
|
||
async fn fetch_era_file(&mut self) -> anyhow::Result<ProcessedEra> { | ||
|
||
|
@@ -101,30 +86,29 @@ impl EraManager { | |
let current_era = self.next_era.take(); | ||
|
||
let current_era = match current_era { | ||
Some(era_handle) => era_handle.await?, | ||
None => self.init_current_era(self.current_block_number).await?, | ||
}; | ||
None => return Err(anyhow::anyhow!("EraManager not initialized")), | ||
}?; | ||
|
||
// Download the next era file | ||
let mut next_era_type = current_era.era_type; | ||
let mut next_epoch_index = current_era.epoch_index + 1; | ||
|
||
// Handle transition from era1 to era | ||
if current_era.era_type == EraType::Era1 && next_epoch_index == ERA1_FILE_COUNT as u64 { | ||
if next_era_type == EraType::Era1 && next_epoch_index == ERA1_FILE_COUNT as u64 { | ||
next_era_type = EraType::Era; | ||
next_epoch_index = FIRST_ERA_EPOCH_WITH_EXECUTION_PAYLOAD; | ||
} | ||
let next_block_number = current_era.first_block_number + current_era.len() as u64; | ||
let next_era_path = self | ||
.fetch_era_file_link(next_block_number, next_epoch_index) | ||
.await?; | ||
let era_type = EraType::for_block_number(next_block_number); | ||
let Some(next_era_path) = (match next_era_type { | ||
EraType::Era1 => self.era1_files.get(&next_epoch_index).cloned(), | ||
EraType::Era => self.era_files.get(&next_epoch_index).cloned(), | ||
}) else { | ||
return Err(anyhow::anyhow!("Unable to find next era file's path: index {next_epoch_index} type {next_era_type:?}")); | ||
|
||
}; | ||
let http_client = self.http_client.clone(); | ||
let join_handle = tokio::spawn(async move { | ||
let raw_era = download_raw_era(next_era_path, http_client.clone()) | ||
.await | ||
.expect("to be able to download raw era"); | ||
match era_type { | ||
EraType::Era1 => process_era1_file(raw_era, next_epoch_index) | ||
.expect("to be able to process era1 file"), | ||
EraType::Era => process_era_file(raw_era, next_epoch_index) | ||
.expect("to be able to process era file"), | ||
let raw_era = download_raw_era(next_era_path, http_client.clone()).await?; | ||
match next_era_type { | ||
EraType::Era1 => process_era1_file(raw_era, next_epoch_index), | ||
EraType::Era => process_era_file(raw_era, next_epoch_index), | ||
} | ||
}); | ||
self.next_era = Some(join_handle); | ||
|
@@ -135,15 +119,23 @@ impl EraManager { | |
/// The first time using era_manager we need to find the current era file we are starting from | ||
/// If the block is from a era file we will need to binary search to find which era file | ||
/// contains the block we want | ||
async fn init_current_era(&mut self, block_number: u64) -> anyhow::Result<ProcessedEra> { | ||
async fn init_current_era( | ||
era1_files: HashMap<u64, String>, | ||
http_client: Client, | ||
block_number: u64, | ||
) -> anyhow::Result<ProcessedEra> { | ||
if let EraType::Era1 = EraType::for_block_number(block_number) { | ||
let epoch_index = Era1::epoch_number_from_block_number(block_number); | ||
let era_path = self.fetch_era_file_link(block_number, epoch_index).await?; | ||
let raw_era1 = download_raw_era(era_path, self.http_client.clone()).await?; | ||
let Some(era1_path) = era1_files.get(&epoch_index).cloned() else { | ||
return Err(anyhow::anyhow!( | ||
|
||
"Unable to find era1 file's path during initialization: index {epoch_index}" | ||
)); | ||
}; | ||
let raw_era1 = download_raw_era(era1_path, http_client.clone()).await?; | ||
|
||
return process_era1_file(raw_era1, epoch_index); | ||
} | ||
|
||
EraBinarySearch::find_era_file(self.http_client.clone(), block_number).await | ||
EraBinarySearch::find_era_file(http_client.clone(), block_number).await | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -370,6 +370,8 @@ mod tests { | |
let raw_era1 = fs::read("../test_assets/era1/mainnet-00000-5ec1ffb8.era1").unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should be using both raw file and EraManager (it should be one or the other). I would say we shouldn't use EraManager in tests if we can avoid it (because EraManager would download from public sources). I think we can obtain ProcessedEra directly from era::utils::process_era1_file(raw_era1, /* epoch_index= */ 0) |
||
for block_tuple in Era1::iter_tuples(raw_era1) { | ||
if block_tuple.header.header.number == 0 { | ||
// skip genesis block | ||
era_manager.get_next_block().await.unwrap(); | ||
continue; | ||
} | ||
state | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation doesn't support fetching blocks out of order, right?
If that's the case, I would make that more explicit, for example:
starting_block
in constructorget_block_by_number
toget_next_block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetching epochs out of order isn't supported
fetching blocks within the current epoch is supported,
If we wanted to make it more explicit I would want
get_current_block
andget_next_block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case yes, I would be more explicit:
pub fn new(starting_block)
and start fetching corresponding era in the background.current_block
andnext_block
(orget_*
variants, whatever you prefer)current_block_number
We would lose this feature, but without fetching epochs our of order doesn't feel like a big loss.