diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index 624fda9c3..a6bbfb3c1 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -346,11 +346,10 @@ impl CollatorStdImpl { &mc_data.shards_processed_to, ); - // trim outdated diffs and calc queue diffs tail lenght - if let Some(value) = min_processed_to { - mq_adapter.trim_diffs(&shard_id, &value)?; - }; - let diff_tail_len = mq_adapter.get_diffs_count_by_shard(&shard_id) as u32 + 1; + let diff_tail_len = mq_adapter.get_diffs_tail_len( + &shard_id, + &min_processed_to.unwrap_or_default().next_value(), + ) + 1; let span = tracing::Span::current(); let (finalize_phase_result, update_queue_task_result) = rayon::join( diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 4966ddcb5..7195ebbc1 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -94,10 +94,8 @@ where fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()>; /// remove all data in uncommitted state storage fn clear_uncommitted_state(&self) -> Result<()>; - /// Returns the number of diffs in cache for the given shard - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; - /// Removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; + /// Returns the diffs tail len for the given shard + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; /// Load statistics for the given range by accounts fn load_statistics( &self, @@ -290,10 +288,11 @@ where // Add messages to uncommitted_state if there are any if !diff.messages.is_empty() { self.uncommitted_state.add_messages_with_statistics( - block_id_short.shard, + &block_id_short, &diff.partition_router, &diff.messages, &statistics, + &max_message, )?; } @@ -423,33 +422,6 @@ where Ok(()) } - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { - let uncommitted_count = self - .uncommitted_diffs - .get(shard_ident) - .map_or(0, |diffs| diffs.len()); - let committed_count = self - .committed_diffs - .get(shard_ident) - .map_or(0, |diffs| diffs.len()); - - uncommitted_count + committed_count - } - - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { - if let Some(mut shard_diffs) = self.uncommitted_diffs.get_mut(source_shard) { - shard_diffs - .value_mut() - .retain(|_, diff| diff.max_message() > inclusive_until); - } - if let Some(mut shard_diffs) = self.committed_diffs.get_mut(source_shard) { - shard_diffs - .value_mut() - .retain(|_, diff| diff.max_message() > inclusive_until); - } - Ok(()) - } - fn load_statistics( &self, partition: QueuePartitionIdx, @@ -511,6 +483,26 @@ where None } + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 { + let uncommitted_tail_len = self + .uncommitted_state + .get_diffs_tail_len(shard_ident, max_message_from); + + let committed_tail_len = self + .committed_state + .get_diffs_tail_len(shard_ident, max_message_from); + + tracing::info!( + target: tracing_targets::MQ, + shard_ident = ?shard_ident, + uncommitted_tail_len, + committed_tail_len, + "Get diffs tail len", + ); + + uncommitted_tail_len + committed_tail_len + } + fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { self.uncommitted_diffs .get(&block_id_short.shard) diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index 1d065d055..b4552f435 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,7 +1,7 @@ use anyhow::Result; use everscale_types::models::{BlockId, IntAddr, ShardIdent}; -use tycho_block_util::queue::QueuePartitionIdx; -use tycho_storage::model::ShardsInternalMessagesKey; +use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; +use tycho_storage::model::{DiffTailKey, ShardsInternalMessagesKey}; use tycho_storage::{InternalQueueSnapshot, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; @@ -83,6 +83,7 @@ pub trait CommittedState: Send + Sync { /// Get last applied mc block id fn get_last_applied_mc_block_id(&self) -> Result>; + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; } // IMPLEMENTATION @@ -167,4 +168,12 @@ impl CommittedState for CommittedStateStdImpl { .internal_queue_storage() .get_last_applied_mc_block_id() } + + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + snapshot.calc_diffs_tail_committed(&DiffTailKey { + shard_ident: *shard_ident, + max_message: *from, + }) + } } diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index c2aa6d553..7c2a0def2 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -2,9 +2,9 @@ use std::collections::BTreeMap; use std::sync::Arc; use anyhow::Result; -use everscale_types::models::{BlockId, IntAddr, ShardIdent}; +use everscale_types::models::{BlockId, BlockIdShort, IntAddr, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; -use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; +use tycho_storage::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey}; use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::{FastHashMap, FastHashSet}; @@ -85,10 +85,11 @@ pub trait LocalUncommittedState { fn add_messages_with_statistics( &self, - source: ShardIdent, + block_id_short: &BlockIdShort, partition_router: &PartitionRouter, messages: &BTreeMap>, statistics: &DiffStatistics, + max_message: &QueueKey, ) -> Result<()>; /// Load statistics for given partition and ranges @@ -99,6 +100,9 @@ pub trait LocalUncommittedState { partition: QueuePartitionIdx, ranges: &[QueueShardRange], ) -> Result<()>; + + /// Get diffs tail length + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; } // IMPLEMENTATION @@ -169,15 +173,17 @@ impl UncommittedState for UncommittedStateStdImpl { fn add_messages_with_statistics( &self, - source: ShardIdent, + block_id_short: &BlockIdShort, partition_router: &PartitionRouter, messages: &BTreeMap>, statistics: &DiffStatistics, + max_message: &QueueKey, ) -> Result<()> { let mut tx = self.storage.internal_queue_storage().begin_transaction(); - Self::add_messages(&mut tx, source, partition_router, messages)?; + Self::add_messages(&mut tx, block_id_short.shard, partition_router, messages)?; Self::add_statistics(&mut tx, statistics)?; + Self::add_diff_tail(&mut tx, block_id_short, max_message); let _histogram = HistogramGuard::begin("tycho_internal_queue_add_messages_with_statistics_write_time"); @@ -207,6 +213,14 @@ impl UncommittedState for UncommittedStateStdImpl { Ok(()) } + + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + snapshot.calc_diffs_tail_uncommitted(&DiffTailKey { + shard_ident: *shard_ident, + max_message: *from, + }) + } } impl UncommittedStateStdImpl { @@ -276,4 +290,18 @@ impl UncommittedStateStdImpl { Ok(()) } + + fn add_diff_tail( + internal_queue_tx: &mut InternalQueueTransaction, + block_id_short: &BlockIdShort, + max_message: &QueueKey, + ) { + internal_queue_tx.insert_diff_tail_uncommitted( + &DiffTailKey { + shard_ident: block_id_short.shard, + max_message: *max_message, + }, + block_id_short.seqno.to_le_bytes().as_slice(), + ); + } } diff --git a/collator/src/manager/blocks_cache.rs b/collator/src/manager/blocks_cache.rs index f8d117a2e..c87436c6b 100644 --- a/collator/src/manager/blocks_cache.rs +++ b/collator/src/manager/blocks_cache.rs @@ -932,7 +932,7 @@ impl MasterBlocksCacheData { fn remove_last_collated_block_ids_from(&mut self, from_block_seqno: &BlockSeqno) { self.last_collated_mc_block_ids - .retain(|seqno, _| seqno <= from_block_seqno); + .retain(|seqno, _| seqno < from_block_seqno); } fn remove_last_collated_block_ids_before(&mut self, before_block_seqno: &BlockSeqno) { diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 96b10ceb2..ce54c75f2 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -1543,11 +1543,6 @@ where )?; } - // trim diffs tails for all shards - for (shard_id, min_processed_to) in &min_processed_to_by_shards { - self.mq_adapter.trim_diffs(shard_id, min_processed_to)?; - } - // sync all applied blocks // and refresh collation session by the last one // with re-init of collators state diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 664e1727f..968cd3934 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -70,18 +70,16 @@ where ) -> Result<()>; fn clear_uncommitted_state(&self) -> Result<()>; - /// Removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; /// Get diffs for the given blocks from committed and uncommitted state fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)>; /// Get diff for the given block from committed and uncommitted state fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; - /// Returns the number of diffs in cache for the given shard - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; /// Check if diff exists in the cache fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; /// Get last applied mc block id from committed state fn get_last_applied_mc_block_id(&self) -> Result>; + /// Get diffs tail len from uncommitted state and committed state + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32; } impl MessageQueueAdapterStdImpl { @@ -200,16 +198,6 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue.clear_uncommitted_state() } - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { - tracing::info!( - target: tracing_targets::MQ_ADAPTER, - source_shard = ?source_shard, - inclusive_until = ?inclusive_until, - "Trimming diffs" - ); - self.queue.trim_diffs(source_shard, inclusive_until) - } - fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)> { self.queue.get_diffs(blocks) } @@ -218,10 +206,6 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue.get_diff(shard_ident, seqno) } - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { - self.queue.get_diffs_count_by_shard(shard_ident) - } - fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { self.queue.is_diff_exists(block_id_short) } @@ -229,4 +213,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI fn get_last_applied_mc_block_id(&self) -> Result> { self.queue.get_last_applied_mc_block_id() } + + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 { + self.queue.get_diffs_tail_len(shard_ident, max_message_from) + } } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 52f3b6b23..a4cdca648 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -1137,24 +1137,32 @@ async fn test_queue_tail() -> anyhow::Result<()> { max_message, )?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); + // length 2 in uncommitted state assert_eq!(diff_len_mc, 2); // commit first diff queue.commit_diff(&[(block_mc1, true)])?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); - + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); + // one diff moved to committed state. one diff left in uncommitted state + // uncommitted: 1; committed: 1 assert_eq!(diff_len_mc, 2); - // trim first diff - queue.trim_diffs(&ShardIdent::MASTERCHAIN, &end_key_mc1)?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); + // exclude committed diff by range + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value()); + // uncommitted: 1; committed: 0 (1) assert_eq!(diff_len_mc, 1); // clear uncommitted state with second diff queue.clear_uncommitted_state()?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); + // uncommitted: 0; committed: 1 + assert_eq!(diff_len_mc, 1); + + // exclude committed diff by range + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value()); + // uncommitted: 0; committed: 0 (1) assert_eq!(diff_len_mc, 0); Ok(()) diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 27eecfd16..4ad369866 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -137,6 +137,8 @@ weedb::tables! { pub internal_message_stats: tables::InternalMessageStats, pub internal_message_stats_uncommitted: tables::InternalMessageStatsUncommited, pub internal_message_var: tables::InternalMessageVar, + pub internal_message_diffs_tail: tables::InternalMessageDiffsTail, + pub internal_message_diffs_tail_uncommitted: tables::InternalMessageDiffsTailUncommitted, } } diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index 5fe3d5bf4..b6683293d 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -518,6 +518,36 @@ impl ColumnFamilyOptions for InternalMessageVar { } } +pub struct InternalMessageDiffsTailUncommitted; +impl ColumnFamily for InternalMessageDiffsTailUncommitted { + const NAME: &'static str = "int_msg_diffs_tail_uncommitted"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageDiffsTailUncommitted { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + +pub struct InternalMessageDiffsTail; +impl ColumnFamily for InternalMessageDiffsTail { + const NAME: &'static str = "int_msg_diffs_tail"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageDiffsTail { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + fn archive_data_merge( _: &[u8], current_value: Option<&[u8]>, diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index c9621be32..a57b8e12a 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -8,7 +8,7 @@ use weedb::rocksdb::{DBRawIterator, WriteBatch}; use weedb::{BoundedCfHandle, ColumnFamily, OwnedRawIterator, OwnedSnapshot, Table}; use crate::db::*; -use crate::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; +use crate::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey}; use crate::util::StoredValue; use crate::QueueStateReader; @@ -72,6 +72,7 @@ impl InternalQueueStorage { let messages_cf = this.db.shard_internal_messages.cf(); let stats_cf = this.db.internal_message_stats.cf(); let var_cf = this.db.internal_message_var.cf(); + let diffs_tail_cf = this.db.internal_message_diffs_tail.cf(); let mut batch = weedb::rocksdb::WriteBatch::default(); @@ -130,6 +131,19 @@ impl InternalQueueStorage { let queue_diff = part.queue_diff(); + // insert diff tail + let diff_tail_key = DiffTailKey { + shard_ident: block_id.shard, + max_message: queue_diff.max_message, + }; + + batch.put_cf( + &diffs_tail_cf, + diff_tail_key.to_vec(), + block_id.seqno.to_le_bytes().as_slice(), + ); + + // insert last applied diff batch.put_cf( &var_cf, INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, @@ -182,9 +196,11 @@ impl InternalQueueStorage { let mut msgs_to_compact = Vec::new(); let mut stats_to_compact = Vec::new(); + let mut diffs_tail_to_compact = Vec::new(); let messages_cf = &self.db.shard_internal_messages.cf(); let stats_cf = &self.db.internal_message_stats.cf(); + let diffs_tail_cf = &self.db.internal_message_diffs_tail.cf(); for range in ranges { // Delete messages in one range @@ -228,6 +244,26 @@ impl InternalQueueStorage { &bump, &mut stats_to_compact, ); + + // Delete diffs tail in one range + let start_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.from, + }; + + let end_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.to, + }; + + delete_range( + &mut batch, + diffs_tail_cf, + &start_diff_tail_key.to_vec(), + &end_diff_tail_key.to_vec(), + &bump, + &mut diffs_tail_to_compact, + ); } let db = self.db.rocksdb().as_ref(); @@ -239,6 +275,9 @@ impl InternalQueueStorage { for (start_key, end_key) in stats_to_compact { db.compact_range_cf(stats_cf, Some(start_key), Some(end_key)); } + for (start_key, end_key) in diffs_tail_to_compact { + db.compact_range_cf(diffs_tail_cf, Some(start_key), Some(end_key)); + } Ok(()) } @@ -265,11 +304,19 @@ impl InternalQueueStorage { &[0xff; StatKey::SIZE_HINT], ); + let diffs_tail_cf = &self.db.internal_message_diffs_tail_uncommitted.cf(); + clear_table( + diffs_tail_cf, + &[0x00; StatKey::SIZE_HINT], + &[0xff; StatKey::SIZE_HINT], + ); + let db = self.db.rocksdb().as_ref(); db.write(batch)?; db.compact_range_cf(messages_cf, None::<[u8; 0]>, None::<[u8; 0]>); db.compact_range_cf(stats_cf, None::<[u8; 0]>, None::<[u8; 0]>); + db.compact_range_cf(diffs_tail_cf, None::<[u8; 0]>, None::<[u8; 0]>); Ok(()) } @@ -307,6 +354,11 @@ impl InternalQueueTransaction { self.batch.put_cf(&cf, key.to_vec(), count.to_le_bytes()); } + pub fn insert_diff_tail_uncommitted(&mut self, key: &DiffTailKey, value: &[u8]) { + let cf = self.db.internal_message_diffs_tail_uncommitted.cf(); + self.batch.put_cf(&cf, key.to_vec(), value); + } + pub fn insert_message_uncommitted( &mut self, key: &ShardsInternalMessagesKey, @@ -376,12 +428,21 @@ impl InternalQueueTransaction { let uncommited_stats = &self.db.internal_message_stats_uncommitted; let uncommited_stats_cf = &uncommited_stats.cf(); + let diff_tail_committed_cf = &self.db.internal_message_diffs_tail.cf(); + let diff_tail_uncommitted_cf = &self.db.internal_message_diffs_tail_uncommitted.cf(); + let mut uncommited_stats_iter = { let mut readopts = uncommited_stats.new_read_config(); readopts.set_snapshot(&snapshot.snapshot); db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) }; + let mut uncommited_diff_tail_iter = { + let mut readopts = uncommited_stats.new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(diff_tail_uncommitted_cf, readopts) + }; + for range in ranges { // Commit messages for one range let from_message_key = ShardsInternalMessagesKey { @@ -426,6 +487,25 @@ impl InternalQueueTransaction { uncommited_stats_cf, stats_cf, )?; + + // Collect diffs tails range + let from_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.from, + }; + + let to_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.to, + }; + + commit_range( + &mut uncommited_diff_tail_iter, + &from_diff_tail_key.to_vec(), + &to_diff_tail_key.to_vec(), + diff_tail_uncommitted_cf, + diff_tail_committed_cf, + )?; } Ok(()) @@ -466,6 +546,14 @@ impl InternalQueueSnapshot { self.iter_messages(&self.db.shard_internal_messages_uncommitted, from, to) } + pub fn calc_diffs_tail_committed(&self, from: &DiffTailKey) -> u32 { + self.calc_diffs_tail(&self.db.internal_message_diffs_tail, from) + } + + pub fn calc_diffs_tail_uncommitted(&self, from: &DiffTailKey) -> u32 { + self.calc_diffs_tail(&self.db.internal_message_diffs_tail_uncommitted, from) + } + fn iter_messages( &self, table: &Table, @@ -488,6 +576,25 @@ impl InternalQueueSnapshot { } } + fn calc_diffs_tail(&self, table: &Table, from: &DiffTailKey) -> u32 { + let mut read_config = table.new_read_config(); + read_config.set_snapshot(&self.snapshot); + + let cf = table.cf(); + let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); + + let from_key = from.to_vec(); + iter.seek(&from_key); + + let mut count = 0; + while let Some((_, _)) = iter.item() { + count += 1; + iter.next(); + } + + count + } + pub fn collect_committed_stats_in_range( &self, shard_ident: ShardIdent, diff --git a/storage/src/store/internal_queue/model.rs b/storage/src/store/internal_queue/model.rs index 215702fcd..76f3c9438 100644 --- a/storage/src/store/internal_queue/model.rs +++ b/storage/src/store/internal_queue/model.rs @@ -144,6 +144,21 @@ impl StatKey { } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct DiffTailKey { + pub shard_ident: ShardIdent, + pub max_message: QueueKey, +} + +impl DiffTailKey { + pub fn new(shard_ident: ShardIdent, max_message: QueueKey) -> Self { + Self { + shard_ident, + max_message, + } + } +} + impl StoredValue for StatKey { const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueuePartitionIdx::SIZE_HINT @@ -181,6 +196,33 @@ impl StoredValue for StatKey { } } +impl StoredValue for DiffTailKey { + const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT; + type OnStackSlice = [u8; Self::SIZE_HINT]; + + fn serialize(&self, buffer: &mut T) { + self.shard_ident.serialize(buffer); + self.max_message.serialize(buffer); + } + + fn deserialize(reader: &mut &[u8]) -> Self + where + Self: Sized, + { + if reader.len() < Self::SIZE_HINT { + panic!("Insufficient data for deserialization"); + } + + let shard_ident = ShardIdent::deserialize(reader); + let max_message = QueueKey::deserialize(reader); + + Self { + shard_ident, + max_message, + } + } +} + pub struct QueueRange { pub shard_ident: ShardIdent, pub partition: QueuePartitionIdx,