Skip to content

Commit

Permalink
feature(collator): persistent cache for diffs
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Feb 23, 2025
1 parent 1bc4ef4 commit d284c57
Show file tree
Hide file tree
Showing 18 changed files with 892 additions and 180 deletions.
Empty file added 11.log
Empty file.
2 changes: 1 addition & 1 deletion block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct QueueKey {
}

impl QueueKey {
const SIZE_HINT: usize = 8 + 32;
pub const SIZE_HINT: usize = 8 + 32;

pub const MIN: Self = Self {
lt: 0,
Expand Down
68 changes: 56 additions & 12 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ use crate::collator::types::{
BlockCollationData, ExecuteResult, FinalizeBlockResult, FinalizeMessagesReaderResult,
PreparedInMsg, PreparedOutMsg,
};
use crate::internal_queue::types::EnqueuedMessage;
use crate::internal_queue::types::{
DiffStatistics, EnqueuedMessage, PartitionRouter, QueueShardRange,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::state_node::StateNodeAdapter;
use crate::tracing_targets;
use crate::types::processed_upto::{ProcessedUptoInfoExtension, ProcessedUptoInfoStuff};
use crate::types::{BlockCandidate, CollationSessionInfo, CollatorConfig, McData, ShardHashesExt};
use crate::types::{
BlockCandidate, CollationSessionInfo, CollatorConfig, McData, ShardDescriptionExt,
ShardHashesExt,
};
use crate::utils::block::detect_top_processed_to_anchor;

pub struct FinalizeState {
Expand All @@ -51,10 +57,11 @@ pub struct FinalizeBlockContext {
}

impl Phase<FinalizeState> {
pub fn finalize_messages_reader(
pub async fn finalize_messages_reader(
&mut self,
messages_reader: MessagesReader,
mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
state_node_adapter: Arc<dyn StateNodeAdapter>,
) -> Result<(
FinalizeMessagesReaderResult,
impl FnOnce() -> Result<Duration>,
Expand All @@ -75,29 +82,66 @@ impl Phase<FinalizeState> {
.collation_data
.top_shard_blocks
.iter()
.map(|b| (b.block_id.shard, b.block_id.seqno))
.map(|b| b.block_id)
.collect()
} else {
let mut top_blocks: FastHashMap<ShardIdent, u32> = self
let mut top_blocks: Vec<BlockId> = self
.state
.mc_data
.shards
.iter()
.filter(|(shard, descr)| {
descr.top_sc_block_updated && shard != &self.state.shard_id
})
.map(|(shard_ident, descr)| (*shard_ident, descr.seqno))
.map(|(shard_ident, descr)| descr.get_block_id(*shard_ident))
.collect();

top_blocks.insert(
self.state.mc_data.block_id.shard,
self.state.mc_data.block_id.seqno,
);
top_blocks.push(self.state.mc_data.block_id);

top_blocks
};

let diffs = mq_adapter.get_diffs(top_shard_blocks);
let mut diffs_info = FastHashMap::default();

for top_shard_block in top_shard_blocks.iter() {
let diff: QueueDiffStuff = state_node_adapter
.load_diff(&top_shard_block)
.await?
.ok_or_else(|| anyhow!("Top shard block diff not found"))?;
let partition_router = PartitionRouter::with_partitions(
&diff.diff().router_partitions_src,
&diff.diff().router_partitions_dst,
);

// TODO use dynamic shards
let range_mc = QueueShardRange {
shard_ident: ShardIdent::MASTERCHAIN,
from: diff.diff().min_message,
to: diff.diff().max_message,
};

let range_shard = QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: diff.diff().min_message,
to: diff.diff().max_message,
};

let ranges = vec![range_mc, range_shard];
let queue_statistic = mq_adapter.get_statistics(0, &ranges)?;

let mut diff_statistics = FastHashMap::default();
diff_statistics.insert(0, queue_statistic.statistics().clone());

let diff_statistic = DiffStatistics::new(
top_shard_block.shard,
diff.diff().min_message,
diff.diff().max_message,
diff_statistics,
queue_statistic.shard_messages_count(),
);

diffs_info.insert(top_shard_block.shard, (partition_router, diff_statistic));
}

// get queue diff and check for pending internals
let create_queue_diff_elapsed;
Expand All @@ -112,7 +156,7 @@ impl Phase<FinalizeState> {
&labels,
);
let finalize_message_reader_res =
messages_reader.finalize(self.extra.executor.min_next_lt(), diffs)?;
messages_reader.finalize(self.extra.executor.min_next_lt(), diffs_info)?;
create_queue_diff_elapsed = histogram_create_queue_diff.finish();
finalize_message_reader_res
};
Expand Down
43 changes: 31 additions & 12 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::collator::do_collate::finalize::FinalizeBlockContext;
use crate::collator::types::RandSeed;
use crate::internal_queue::types::EnqueuedMessage;
use crate::queue_adapter::MessageQueueAdapter;
use crate::state_node::StateNodeAdapter;
use crate::tracing_targets;
use crate::types::{
BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig,
Expand Down Expand Up @@ -128,20 +129,25 @@ impl CollatorStdImpl {
let collation_session = self.collation_session.clone();
let config = self.config.clone();
let mq_adapter = self.mq_adapter.clone();
let state_node_adapter = self.state_node_adapter.clone();
let span = tracing::Span::current();
move || {
let _span = span.enter();

Self::run(
config,
mq_adapter,
reader_state,
anchors_cache,
state,
collation_session,
wu_used_from_last_anchor,
usage_tree,
)
tokio::runtime::Handle::current().block_on(async {
Self::run(
config,
mq_adapter,
state_node_adapter,
reader_state,
anchors_cache,
state,
collation_session,
wu_used_from_last_anchor,
usage_tree,
)
.await
})
}
})
.await?;
Expand Down Expand Up @@ -224,9 +230,10 @@ impl CollatorStdImpl {
}

#[allow(clippy::too_many_arguments)]
fn run(
async fn run(
collator_config: Arc<CollatorConfig>,
mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
state_node_adapter: Arc<dyn StateNodeAdapter>,
reader_state: ReaderState,
anchors_cache: AnchorsCache,
state: Box<ActualState>,
Expand All @@ -238,6 +245,7 @@ impl CollatorStdImpl {
let labels = [("workchain", shard_id.workchain().to_string())];
let mc_data = state.mc_data.clone();

let block_id = state.collation_data.block_id_short.clone();
// prepare execution
let histogram_prepare =
HistogramGuard::begin_with_labels("tycho_do_collate_prepare_time", &labels);
Expand Down Expand Up @@ -297,7 +305,9 @@ impl CollatorStdImpl {
create_queue_diff_elapsed,
},
update_queue_task,
) = finalize_phase.finalize_messages_reader(messages_reader, mq_adapter.clone())?;
) = finalize_phase
.finalize_messages_reader(messages_reader, mq_adapter.clone(), state_node_adapter)
.await?;

let finalize_block_timer = std::time::Instant::now();

Expand Down Expand Up @@ -351,6 +361,15 @@ impl CollatorStdImpl {
&min_processed_to.unwrap_or_default().next_value(),
) + 1;

tracing::info!(target: "local_debug",
"block tail len {:?} {} {:?}",
block_id,
diff_tail_len,
min_processed_to
);

// let diff_tail_len = 1;

let span = tracing::Span::current();
let (finalize_phase_result, update_queue_task_result) = rayon::join(
|| {
Expand Down
22 changes: 10 additions & 12 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl InternalsPartitionReader {
while current_block_seqno < self.block_seqno {
let diff = self
.mq_adapter
.get_diff(self.for_shard_id, current_block_seqno)
.get_diff(&self.for_shard_id, current_block_seqno)?
.ok_or_else(|| {
anyhow!(
"cannot get diff for block {}:{}",
Expand All @@ -425,9 +425,7 @@ impl InternalsPartitionReader {
)
})?;

messages_count += diff
.statistics()
.get_messages_count_by_shard(&self.for_shard_id);
messages_count += diff.get_messages_count_by_shard(&self.for_shard_id);

if messages_count > max_messages as u64 {
break;
Expand All @@ -446,14 +444,14 @@ impl InternalsPartitionReader {

let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff =
self.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

*diff.max_message()
let diff = self
.mq_adapter
.get_diff(&shard_id, range_seqno)?
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

diff.max_message
} else {
QueueKey::max_for_lt(self.prev_state_gen_lt)
}
Expand Down
20 changes: 9 additions & 11 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{Context, Result};
use everscale_types::cell::HashBytes;
use everscale_types::models::{MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::FastHashSet;
use tycho_util::{FastHashMap, FastHashSet};

use self::externals_reader::*;
use self::internals_reader::*;
Expand All @@ -15,9 +15,8 @@ pub(super) use self::reader_state::*;
use super::messages_buffer::{DisplayMessageGroup, MessageGroup, MessagesBufferLimits};
use super::types::{AnchorsCache, MsgsExecutionParamsExtension};
use crate::collator::messages_buffer::DebugMessageGroup;
use crate::internal_queue::queue::ShortQueueDiff;
use crate::internal_queue::types::{
EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics,
DiffStatistics, EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
Expand Down Expand Up @@ -367,7 +366,7 @@ impl MessagesReader {
pub fn finalize(
mut self,
current_next_lt: u64,
diffs: Vec<(ShardIdent, ShortQueueDiff)>,
diffs_info: FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
) -> Result<FinalizedMessagesReader> {
let mut has_unprocessed_messages = self.has_messages_in_buffers()
|| self.has_pending_new_messages()
Expand Down Expand Up @@ -435,7 +434,7 @@ impl MessagesReader {
&mut queue_diff_with_msgs.partition_router,
aggregated_stats,
self.for_shard_id,
diffs,
diffs_info,
)?;

// metrics: accounts count in isolated partitions
Expand Down Expand Up @@ -498,7 +497,7 @@ impl MessagesReader {
partition_router: &mut PartitionRouter,
aggregated_stats: QueueStatistics,
for_shard_id: ShardIdent,
top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>,
diffs_info: FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
) -> Result<FastHashSet<HashBytes>> {
let par_0_msgs_count_limit = msgs_exec_params.par_0_int_msgs_count_limit as u64;
let mut moved_from_par_0_accounts = FastHashSet::default();
Expand Down Expand Up @@ -531,7 +530,7 @@ impl MessagesReader {
dest_int_address,
);
// if we have account for another shard then take info from that shard
let acc_shard_diff_info = top_block_diffs
let acc_shard_diff_info = diffs_info
.iter()
.find(|(shard_id, _)| shard_id.contains_address(&dest_int_address))
.map(|(_, diff)| diff);
Expand All @@ -546,14 +545,13 @@ impl MessagesReader {
);
msgs_count
}
Some(diff) => {
Some((router, statistics)) => {
tracing::trace!(target: tracing_targets::COLLATOR,
"use diff for address {} because we have diff",
dest_int_address,
);
// getting remote shard partition from diff
let remote_shard_partition =
diff.router().get_partition(None, &dest_int_address);
let remote_shard_partition = router.get_partition(None, &dest_int_address);

tracing::trace!(target: tracing_targets::COLLATOR,
"remote shard partition for address {} is {}",
Expand All @@ -571,7 +569,7 @@ impl MessagesReader {
}

// if remote partition == 0 then we need to check statistics
let remote_msgs_count = match diff.statistics().partition(0) {
let remote_msgs_count = match statistics.partition(0) {
None => {
tracing::trace!(target: tracing_targets::COLLATOR,
"use aggregated stats for address {} because we do not have partition 0 stats in diff",
Expand Down
3 changes: 2 additions & 1 deletion collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod messages_reader;
mod types;

pub use error::CollationCancelReason;
use tycho_util::FastHashMap;
pub use types::ForceMasterCollation;

#[cfg(test)]
Expand Down Expand Up @@ -1315,7 +1316,7 @@ impl CollatorStdImpl {
mut reader_state, ..
} = messages_reader.finalize(
0, // can pass 0 because new messages reader was not initialized in this case
vec![],
FastHashMap::default(),
)?;
std::mem::swap(&mut working_state.reader_state, &mut reader_state);

Expand Down
Loading

0 comments on commit d284c57

Please sign in to comment.