Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-util): use rayon for get_entry_by_id #611

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bytes = { workspace = true }
everscale-types = { workspace = true, features = ["blake3", "rayon"] }
hex = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
thiserror = { workspace = true }
tl-proto = { workspace = true }

Expand Down
39 changes: 30 additions & 9 deletions block-util/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! * Archive entry data

use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
Expand Down Expand Up @@ -90,19 +91,39 @@ impl Archive {
}
}

// TODO: Make async
pub fn get_entry_by_id(
&self,
/// NOTE: Takes up to a magnitude of seconds to run on large blocks.
pub async fn get_entry_by_id(
self: &Arc<Self>,
id: &BlockId,
) -> Result<(BlockStuffAug, BlockProofStuffAug, QueueDiffStuffAug), ArchiveError> {
// TODO: Rayon go brr
let block = self.get_block_by_id(id)?;
let proof = self.get_proof_by_id(id)?;
let queue_diff = self.get_queue_diff_by_id(id)?;

Ok((block, proof, queue_diff))
let this = self.clone();
let id = *id;

let (block, proof, queue_diff) = tycho_util::sync::rayon_run(move || {
let mut block_res = None;
let mut proof_res = None;
let mut diff_res = None;
rayon::scope(|s| {
s.spawn(|_| {
proof_res = Some(this.get_proof_by_id(&id));
diff_res = Some(this.get_queue_diff_by_id(&id));
});

block_res = Some(this.get_block_by_id(&id));
});

(
block_res.expect("scope must finish"),
proof_res.expect("scope must finish"),
diff_res.expect("scope must finish"),
)
})
.await;

Ok((block?, proof?, queue_diff?))
}

/// NOTE: Takes up to a magnitude of seconds to run on large blocks.
pub fn get_block_by_id(&self, id: &BlockId) -> Result<BlockStuffAug, ArchiveError> {
let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
entry
Expand Down
4 changes: 2 additions & 2 deletions core/src/block_strider/provider/archive_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ impl ArchiveBlockProvider {

async fn checked_get_entry_by_id(
&self,
archive: &Archive,
archive: &Arc<Archive>,
mc_block_id: &BlockId,
block_id: &BlockId,
) -> Result<BlockStuffAug> {
let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id) {
let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
Ok(entry) => entry,
Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
};
Expand Down
25 changes: 14 additions & 11 deletions core/tests/archives.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::Result;
use bytes::BytesMut;
use bytesize::ByteSize;
Expand Down Expand Up @@ -41,7 +43,7 @@ impl StateSubscriber for DummySubscriber {
}

pub struct ArchiveProvider {
archive: Archive,
archive: Arc<Archive>,
proof_checker: ProofChecker,
}

Expand All @@ -52,10 +54,11 @@ impl ArchiveProvider {
block_id,
} = block_id_relation;

let (ref block, ref proof, ref queue_diff) = match self.archive.get_entry_by_id(block_id) {
Ok(entry) => entry,
Err(e) => return Some(Err(e.into())),
};
let (ref block, ref proof, ref queue_diff) =
match self.archive.get_entry_by_id(block_id).await {
Ok(entry) => entry,
Err(e) => return Some(Err(e.into())),
};

match self
.proof_checker
Expand Down Expand Up @@ -249,7 +252,7 @@ async fn archives() -> Result<()> {

// Archive provider
let archive_data = utils::read_file("archive_1.bin")?;
let archive = utils::parse_archive(&archive_data)?;
let archive = utils::parse_archive(&archive_data).map(Arc::new)?;

let archive_provider = ArchiveProvider {
archive,
Expand All @@ -258,7 +261,7 @@ async fn archives() -> Result<()> {

// Next archive provider
let next_archive_data = utils::read_file("archive_2.bin")?;
let next_archive = utils::parse_archive(&next_archive_data)?;
let next_archive = utils::parse_archive(&next_archive_data).map(Arc::new)?;

let next_archive_provider = ArchiveProvider {
archive: next_archive,
Expand All @@ -267,7 +270,7 @@ async fn archives() -> Result<()> {

// Last archive provider
let last_archive_data = utils::read_file("archive_3.bin")?;
let last_archive = utils::parse_archive(&last_archive_data)?;
let last_archive = utils::parse_archive(&last_archive_data).map(Arc::new)?;

let last_archive_provider = ArchiveProvider {
archive: last_archive,
Expand Down Expand Up @@ -360,7 +363,7 @@ async fn heavy_archives() -> Result<()> {
// Archive provider
let archive_path = integration_test_path.join("archive_1.bin");
let archive_data = std::fs::read(archive_path)?;
let archive = utils::parse_archive(&archive_data)?;
let archive = utils::parse_archive(&archive_data).map(Arc::new)?;

let archive_provider = ArchiveProvider {
archive,
Expand All @@ -370,7 +373,7 @@ async fn heavy_archives() -> Result<()> {
// Next archive provider
let next_archive_path = integration_test_path.join("archive_2.bin");
let next_archive_data = std::fs::read(next_archive_path)?;
let next_archive = utils::parse_archive(&next_archive_data)?;
let next_archive = utils::parse_archive(&next_archive_data).map(Arc::new)?;

let next_archive_provider = ArchiveProvider {
archive: next_archive,
Expand All @@ -380,7 +383,7 @@ async fn heavy_archives() -> Result<()> {
// Last archive provider
let last_archive_path = integration_test_path.join("archive_3.bin");
let last_archive_data = std::fs::read(last_archive_path)?;
let last_archive = utils::parse_archive(&last_archive_data)?;
let last_archive = utils::parse_archive(&last_archive_data).map(Arc::new)?;

let last_archive_provider = ArchiveProvider {
archive: last_archive,
Expand Down
4 changes: 2 additions & 2 deletions core/tests/overlay_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn overlay_server_blocks() -> Result<()> {
.build();

let archive_data = utils::read_file("archive_1.bin")?;
let archive = utils::parse_archive(&archive_data)?;
let archive = utils::parse_archive(&archive_data).map(Arc::new)?;

for block_id in archive.blocks.keys() {
if block_id.shard.is_masterchain() {
Expand All @@ -248,7 +248,7 @@ async fn overlay_server_blocks() -> Result<()> {
.await?;

let (archive_block, archive_proof, archive_queue_diff) =
archive.get_entry_by_id(block_id)?;
archive.get_entry_by_id(block_id).await?;

if let Some(block_full) = &result.data {
let block = BlockStuff::deserialize_checked(block_id, &block_full.block_data)?;
Expand Down
6 changes: 4 additions & 2 deletions core/tests/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use tempfile::TempDir;
use tycho_storage::{NewBlockMeta, Storage};
Expand Down Expand Up @@ -29,10 +31,10 @@ pub(crate) async fn init_storage() -> Result<(Storage, TempDir)> {

// Init blocks
let archive_data = utils::read_file("archive_1.bin")?;
let block_provider = utils::parse_archive(&archive_data)?;
let block_provider = utils::parse_archive(&archive_data).map(Arc::new)?;

for block_id in block_provider.mc_block_ids.values() {
let (block, proof, diff) = block_provider.get_entry_by_id(block_id)?;
let (block, proof, diff) = block_provider.get_entry_by_id(block_id).await?;

let info = block.load_info().context("Failed to load block info")?;
let meta = NewBlockMeta {
Expand Down
4 changes: 1 addition & 3 deletions core/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ pub(crate) fn parse_archive(data: &[u8]) -> Result<Archive> {
let mut decompressed = Vec::new();
decoder.write(data, &mut decompressed)?;

let archive = Archive::new(decompressed)?;

Ok(archive)
Archive::new(decompressed)
}

pub(crate) fn read_file(filename: &str) -> Result<Vec<u8>> {
Expand Down
Loading