Skip to content

Commit

Permalink
feature(collator): change diffs tail calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Feb 20, 2025
1 parent aa50d98 commit 1bc4ef4
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 76 deletions.
9 changes: 4 additions & 5 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,10 @@ impl CollatorStdImpl {
&mc_data.shards_processed_to,
);

// trim outdated diffs and calc queue diffs tail lenght
if let Some(value) = min_processed_to {
mq_adapter.trim_diffs(&shard_id, &value)?;
};
let diff_tail_len = mq_adapter.get_diffs_count_by_shard(&shard_id) as u32 + 1;
let diff_tail_len = mq_adapter.get_diffs_tail_len(
&shard_id,
&min_processed_to.unwrap_or_default().next_value(),
) + 1;

let span = tracing::Span::current();
let (finalize_phase_result, update_queue_task_result) = rayon::join(
Expand Down
56 changes: 24 additions & 32 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ where
fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()>;
/// remove all data in uncommitted state storage
fn clear_uncommitted_state(&self) -> Result<()>;
/// Returns the number of diffs in cache for the given shard
fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
/// Removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard`
fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>;
/// Returns the diffs tail len for the given shard
fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
/// Load statistics for the given range by accounts
fn load_statistics(
&self,
Expand Down Expand Up @@ -290,10 +288,11 @@ where
// Add messages to uncommitted_state if there are any
if !diff.messages.is_empty() {
self.uncommitted_state.add_messages_with_statistics(
block_id_short.shard,
&block_id_short,
&diff.partition_router,
&diff.messages,
&statistics,
&max_message,
)?;
}

Expand Down Expand Up @@ -423,33 +422,6 @@ where
Ok(())
}

fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
let uncommitted_count = self
.uncommitted_diffs
.get(shard_ident)
.map_or(0, |diffs| diffs.len());
let committed_count = self
.committed_diffs
.get(shard_ident)
.map_or(0, |diffs| diffs.len());

uncommitted_count + committed_count
}

fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> {
if let Some(mut shard_diffs) = self.uncommitted_diffs.get_mut(source_shard) {
shard_diffs
.value_mut()
.retain(|_, diff| diff.max_message() > inclusive_until);
}
if let Some(mut shard_diffs) = self.committed_diffs.get_mut(source_shard) {
shard_diffs
.value_mut()
.retain(|_, diff| diff.max_message() > inclusive_until);
}
Ok(())
}

fn load_statistics(
&self,
partition: QueuePartitionIdx,
Expand Down Expand Up @@ -511,6 +483,26 @@ where
None
}

fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 {
let uncommitted_tail_len = self
.uncommitted_state
.get_diffs_tail_len(shard_ident, max_message_from);

let committed_tail_len = self
.committed_state
.get_diffs_tail_len(shard_ident, max_message_from);

tracing::info!(
target: tracing_targets::MQ,
shard_ident = ?shard_ident,
uncommitted_tail_len,
committed_tail_len,
"Get diffs tail len",
);

uncommitted_tail_len + committed_tail_len
}

fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool {
self.uncommitted_diffs
.get(&block_id_short.shard)
Expand Down
13 changes: 11 additions & 2 deletions collator/src/internal_queue/state/commited_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use everscale_types::models::{BlockId, IntAddr, ShardIdent};
use tycho_block_util::queue::QueuePartitionIdx;
use tycho_storage::model::ShardsInternalMessagesKey;
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_storage::model::{DiffTailKey, ShardsInternalMessagesKey};
use tycho_storage::{InternalQueueSnapshot, Storage};
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;
Expand Down Expand Up @@ -83,6 +83,7 @@ pub trait CommittedState<V: InternalMessageValue>: Send + Sync {

/// Get last applied mc block id
fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>>;
fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -167,4 +168,12 @@ impl<V: InternalMessageValue> CommittedState<V> for CommittedStateStdImpl {
.internal_queue_storage()
.get_last_applied_mc_block_id()
}

fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
let snapshot = self.storage.internal_queue_storage().make_snapshot();
snapshot.calc_diffs_tail_committed(&DiffTailKey {
shard_ident: *shard_ident,
max_message: *from,
})
}
}
38 changes: 33 additions & 5 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
use everscale_types::models::{BlockId, IntAddr, ShardIdent};
use everscale_types::models::{BlockId, BlockIdShort, IntAddr, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr};
use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey};
use tycho_storage::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey};
use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage};
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastHashMap, FastHashSet};
Expand Down Expand Up @@ -85,10 +85,11 @@ pub trait LocalUncommittedState<V: InternalMessageValue> {

fn add_messages_with_statistics(
&self,
source: ShardIdent,
block_id_short: &BlockIdShort,
partition_router: &PartitionRouter,
messages: &BTreeMap<QueueKey, Arc<V>>,
statistics: &DiffStatistics,
max_message: &QueueKey,
) -> Result<()>;

/// Load statistics for given partition and ranges
Expand All @@ -99,6 +100,9 @@ pub trait LocalUncommittedState<V: InternalMessageValue> {
partition: QueuePartitionIdx,
ranges: &[QueueShardRange],
) -> Result<()>;

/// Get diffs tail length
fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -169,15 +173,17 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {

fn add_messages_with_statistics(
&self,
source: ShardIdent,
block_id_short: &BlockIdShort,
partition_router: &PartitionRouter,
messages: &BTreeMap<QueueKey, Arc<V>>,
statistics: &DiffStatistics,
max_message: &QueueKey,
) -> Result<()> {
let mut tx = self.storage.internal_queue_storage().begin_transaction();

Self::add_messages(&mut tx, source, partition_router, messages)?;
Self::add_messages(&mut tx, block_id_short.shard, partition_router, messages)?;
Self::add_statistics(&mut tx, statistics)?;
Self::add_diff_tail(&mut tx, block_id_short, max_message);

let _histogram =
HistogramGuard::begin("tycho_internal_queue_add_messages_with_statistics_write_time");
Expand Down Expand Up @@ -207,6 +213,14 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {

Ok(())
}

fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
let snapshot = self.storage.internal_queue_storage().make_snapshot();
snapshot.calc_diffs_tail_uncommitted(&DiffTailKey {
shard_ident: *shard_ident,
max_message: *from,
})
}
}

impl UncommittedStateStdImpl {
Expand Down Expand Up @@ -276,4 +290,18 @@ impl UncommittedStateStdImpl {

Ok(())
}

fn add_diff_tail(
internal_queue_tx: &mut InternalQueueTransaction,
block_id_short: &BlockIdShort,
max_message: &QueueKey,
) {
internal_queue_tx.insert_diff_tail_uncommitted(
&DiffTailKey {
shard_ident: block_id_short.shard,
max_message: *max_message,
},
block_id_short.seqno.to_le_bytes().as_slice(),
);
}
}
2 changes: 1 addition & 1 deletion collator/src/manager/blocks_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ impl MasterBlocksCacheData {

fn remove_last_collated_block_ids_from(&mut self, from_block_seqno: &BlockSeqno) {
self.last_collated_mc_block_ids
.retain(|seqno, _| seqno <= from_block_seqno);
.retain(|seqno, _| seqno < from_block_seqno);
}

fn remove_last_collated_block_ids_before(&mut self, before_block_seqno: &BlockSeqno) {
Expand Down
5 changes: 0 additions & 5 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,11 +1543,6 @@ where
)?;
}

// trim diffs tails for all shards
for (shard_id, min_processed_to) in &min_processed_to_by_shards {
self.mq_adapter.trim_diffs(shard_id, min_processed_to)?;
}

// sync all applied blocks
// and refresh collation session by the last one
// with re-init of collators state
Expand Down
24 changes: 6 additions & 18 deletions collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,16 @@ where
) -> Result<()>;

fn clear_uncommitted_state(&self) -> Result<()>;
/// Removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard`
fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>;
/// Get diffs for the given blocks from committed and uncommitted state
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;
/// Get diff for the given block from committed and uncommitted state
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
/// Returns the number of diffs in cache for the given shard
fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
/// Check if diff exists in the cache
fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool;
/// Get last applied mc block id from committed state
fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>>;
/// Get diffs tail len from uncommitted state and committed state
fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32;
}

impl<V: InternalMessageValue> MessageQueueAdapterStdImpl<V> {
Expand Down Expand Up @@ -200,16 +198,6 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
self.queue.clear_uncommitted_state()
}

fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> {
tracing::info!(
target: tracing_targets::MQ_ADAPTER,
source_shard = ?source_shard,
inclusive_until = ?inclusive_until,
"Trimming diffs"
);
self.queue.trim_diffs(source_shard, inclusive_until)
}

fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)> {
self.queue.get_diffs(blocks)
}
Expand All @@ -218,15 +206,15 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
self.queue.get_diff(shard_ident, seqno)
}

fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
self.queue.get_diffs_count_by_shard(shard_ident)
}

fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool {
self.queue.is_diff_exists(block_id_short)
}

fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>> {
self.queue.get_last_applied_mc_block_id()
}

fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 {
self.queue.get_diffs_tail_len(shard_ident, max_message_from)
}
}
22 changes: 15 additions & 7 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,24 +1137,32 @@ async fn test_queue_tail() -> anyhow::Result<()> {
max_message,
)?;

let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN);
let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN);

// length 2 in uncommitted state
assert_eq!(diff_len_mc, 2);

// commit first diff
queue.commit_diff(&[(block_mc1, true)])?;
let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN);

let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN);
// one diff moved to committed state. one diff left in uncommitted state
// uncommitted: 1; committed: 1
assert_eq!(diff_len_mc, 2);

// trim first diff
queue.trim_diffs(&ShardIdent::MASTERCHAIN, &end_key_mc1)?;
let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN);
// exclude committed diff by range
let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value());
// uncommitted: 1; committed: 0 (1)
assert_eq!(diff_len_mc, 1);

// clear uncommitted state with second diff
queue.clear_uncommitted_state()?;
let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN);
let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN);
// uncommitted: 0; committed: 1
assert_eq!(diff_len_mc, 1);

// exclude committed diff by range
let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value());
// uncommitted: 0; committed: 0 (1)
assert_eq!(diff_len_mc, 0);

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions storage/src/db/kv_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ weedb::tables! {
pub internal_message_stats: tables::InternalMessageStats,
pub internal_message_stats_uncommitted: tables::InternalMessageStatsUncommited,
pub internal_message_var: tables::InternalMessageVar,
pub internal_message_diffs_tail: tables::InternalMessageDiffsTail,
pub internal_message_diffs_tail_uncommitted: tables::InternalMessageDiffsTailUncommitted,
}
}

Expand Down
30 changes: 30 additions & 0 deletions storage/src/db/kv_db/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,36 @@ impl ColumnFamilyOptions<Caches> for InternalMessageVar {
}
}

pub struct InternalMessageDiffsTailUncommitted;
impl ColumnFamily for InternalMessageDiffsTailUncommitted {
const NAME: &'static str = "int_msg_diffs_tail_uncommitted";

fn read_options(opts: &mut ReadOptions) {
opts.set_verify_checksums(true);
}
}

impl ColumnFamilyOptions<Caches> for InternalMessageDiffsTailUncommitted {
fn options(opts: &mut Options, caches: &mut Caches) {
zstd_block_based_table_factory(opts, caches);
}
}

pub struct InternalMessageDiffsTail;
impl ColumnFamily for InternalMessageDiffsTail {
const NAME: &'static str = "int_msg_diffs_tail";

fn read_options(opts: &mut ReadOptions) {
opts.set_verify_checksums(true);
}
}

impl ColumnFamilyOptions<Caches> for InternalMessageDiffsTail {
fn options(opts: &mut Options, caches: &mut Caches) {
zstd_block_based_table_factory(opts, caches);
}
}

fn archive_data_merge(
_: &[u8],
current_value: Option<&[u8]>,
Expand Down
Loading

0 comments on commit 1bc4ef4

Please sign in to comment.