diff --git a/CHANGELOG.md b/CHANGELOG.md index e4d5b0588f9..19a3f28707a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ As a minor extension, we have adopted a slightly different versioning convention - Support computation of the Cardano Transactions signature and proving with the pre-computed Block Range Merkle Roots retrieved from the database. +- Prune Cardano Transactions from the signer database after the Block Range Merkle Roots have been computed. + - Update website and explorer user interface to use the new mithril logo. - Crates versions: diff --git a/Cargo.lock b/Cargo.lock index d3d8a45cc46..0ba63cfd7bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3452,7 +3452,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.5.7" +version = "0.5.8" dependencies = [ "anyhow", "async-trait", @@ -3705,7 +3705,7 @@ dependencies = [ [[package]] name = "mithril-persistence" -version = "0.1.11" +version = "0.1.12" dependencies = [ "anyhow", "async-trait", @@ -3752,7 +3752,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.135" +version = "0.2.136" dependencies = [ "anyhow", "async-trait", diff --git a/internal/mithril-persistence/Cargo.toml b/internal/mithril-persistence/Cargo.toml index 8990bfbcfe7..ca467a41922 100644 --- a/internal/mithril-persistence/Cargo.toml +++ b/internal/mithril-persistence/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-persistence" -version = "0.1.11" +version = "0.1.12" description = "Common types, interfaces, and utilities to persist data for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/internal/mithril-persistence/src/database/provider/cardano_transaction/delete_cardano_transaction.rs b/internal/mithril-persistence/src/database/provider/cardano_transaction/delete_cardano_transaction.rs new file mode 100644 index 00000000000..f0ffdd06e26 --- /dev/null +++ b/internal/mithril-persistence/src/database/provider/cardano_transaction/delete_cardano_transaction.rs @@ -0,0 +1,142 @@ +use anyhow::Context; +use sqlite::Value; + +use mithril_common::entities::BlockNumber; +use mithril_common::StdResult; + +use crate::database::record::CardanoTransactionRecord; +use crate::sqlite::{ + EntityCursor, Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition, +}; + +/// Query to delete old [CardanoTransactionRecord] from the sqlite database +pub struct DeleteCardanoTransactionProvider<'conn> { + connection: &'conn SqliteConnection, +} + +impl<'conn> Provider<'conn> for DeleteCardanoTransactionProvider<'conn> { + type Entity = CardanoTransactionRecord; + + fn get_connection(&'conn self) -> &'conn SqliteConnection { + self.connection + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection() + .expand(SourceAlias::new(&[("{:cardano_tx:}", "cardano_tx")])); + + format!("delete from cardano_tx where {condition} returning {projection}") + } +} + +impl<'conn> DeleteCardanoTransactionProvider<'conn> { + /// Create a new instance + pub fn new(connection: &'conn SqliteConnection) -> Self { + Self { connection } + } + + fn get_prune_condition( + &self, + block_number_threshold: BlockNumber, + ) -> StdResult { + let threshold = Value::Integer(block_number_threshold.try_into().with_context(|| { + format!("Failed to convert threshold `{block_number_threshold}` to i64") + })?); + + Ok(WhereCondition::new("block_number < ?*", vec![threshold])) + } + + /// Prune the cardano transaction data below the given threshold. + pub fn prune( + &self, + block_number_threshold: BlockNumber, + ) -> StdResult> { + let filters = self.get_prune_condition(block_number_threshold)?; + + self.find(filters) + } +} + +#[cfg(test)] +mod tests { + use crate::database::provider::{ + GetCardanoTransactionProvider, InsertCardanoTransactionProvider, + }; + use crate::database::test_helper::cardano_tx_db_connection; + use crate::sqlite::GetAllProvider; + + use super::*; + + fn insert_transactions(connection: &SqliteConnection, records: Vec) { + let provider = InsertCardanoTransactionProvider::new(connection); + let condition = provider.get_insert_many_condition(records).unwrap(); + let mut cursor = provider.find(condition).unwrap(); + cursor.next().unwrap(); + } + + fn test_transaction_set() -> Vec { + vec![ + CardanoTransactionRecord::new("tx-hash-0", 10, 50, "block-hash-10", 1), + CardanoTransactionRecord::new("tx-hash-1", 10, 51, "block-hash-10", 1), + CardanoTransactionRecord::new("tx-hash-2", 11, 52, "block-hash-11", 1), + CardanoTransactionRecord::new("tx-hash-3", 11, 53, "block-hash-11", 1), + CardanoTransactionRecord::new("tx-hash-4", 12, 54, "block-hash-12", 1), + CardanoTransactionRecord::new("tx-hash-5", 12, 55, "block-hash-12", 1), + ] + } + + #[test] + fn test_prune_work_even_without_transactions_in_db() { + let connection = cardano_tx_db_connection().unwrap(); + + let prune_provider = DeleteCardanoTransactionProvider::new(&connection); + let cursor = prune_provider + .prune(100) + .expect("pruning shouldn't crash without transactions stored"); + assert_eq!(0, cursor.count()); + } + + #[test] + fn test_prune_all_data_if_given_block_number_is_larger_than_stored_number_of_block() { + let connection = cardano_tx_db_connection().unwrap(); + insert_transactions(&connection, test_transaction_set()); + + let prune_provider = DeleteCardanoTransactionProvider::new(&connection); + let cursor = prune_provider.prune(100_000).unwrap(); + assert_eq!(test_transaction_set().len(), cursor.count()); + + let get_provider = GetCardanoTransactionProvider::new(&connection); + let cursor = get_provider.get_all().unwrap(); + assert_eq!(0, cursor.count()); + } + + #[test] + fn test_prune_keep_all_tx_of_last_block_if_given_number_of_block_is_zero() { + let connection = cardano_tx_db_connection().unwrap(); + insert_transactions(&connection, test_transaction_set()); + + let prune_provider = DeleteCardanoTransactionProvider::new(&connection); + let cursor = prune_provider.prune(0).unwrap(); + assert_eq!(0, cursor.count()); + + let get_provider = GetCardanoTransactionProvider::new(&connection); + let cursor = get_provider.get_all().unwrap(); + assert_eq!(test_transaction_set().len(), cursor.count()); + } + + #[test] + fn test_prune_data_of_below_given_blocks() { + let connection = cardano_tx_db_connection().unwrap(); + insert_transactions(&connection, test_transaction_set()); + + let prune_provider = DeleteCardanoTransactionProvider::new(&connection); + let cursor = prune_provider.prune(12).unwrap(); + assert_eq!(4, cursor.count()); + + let get_provider = GetCardanoTransactionProvider::new(&connection); + let cursor = get_provider.get_all().unwrap(); + assert_eq!(2, cursor.count()); + } +} diff --git a/internal/mithril-persistence/src/database/provider/cardano_transaction/mod.rs b/internal/mithril-persistence/src/database/provider/cardano_transaction/mod.rs index 35b953c9b4e..1c5de369800 100644 --- a/internal/mithril-persistence/src/database/provider/cardano_transaction/mod.rs +++ b/internal/mithril-persistence/src/database/provider/cardano_transaction/mod.rs @@ -1,5 +1,7 @@ +mod delete_cardano_transaction; mod get_cardano_transaction; mod insert_cardano_transaction; +pub use delete_cardano_transaction::*; pub use get_cardano_transaction::*; pub use insert_cardano_transaction::*; diff --git a/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs b/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs index 24e187e3a9f..f637a1e47d2 100644 --- a/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs +++ b/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs @@ -14,12 +14,14 @@ use mithril_common::signable_builder::BlockRangeRootRetriever; use mithril_common::StdResult; use crate::database::provider::{ - GetBlockRangeRootProvider, GetCardanoTransactionProvider, + DeleteCardanoTransactionProvider, GetBlockRangeRootProvider, GetCardanoTransactionProvider, GetIntervalWithoutBlockRangeRootProvider, InsertBlockRangeRootProvider, InsertCardanoTransactionProvider, }; use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord}; -use crate::sqlite::{GetAllProvider, Provider, SqliteConnection, WhereCondition}; +use crate::sqlite::{ + ConnectionExtensions, GetAllProvider, Provider, SqliteConnection, WhereCondition, +}; /// ## Cardano transaction repository /// @@ -147,32 +149,31 @@ impl CardanoTransactionRepository { &self, immutable_file_number: ImmutableFileNumber, ) -> StdResult> { - let sql = - "select max(block_number) as highest from cardano_tx where immutable_file_number <= $1;"; - match self + let highest: Option = self.connection.query_single_cell( + "select max(block_number) as highest from cardano_tx where immutable_file_number <= ?;", + &[Value::Integer(immutable_file_number as i64)], + )?; + highest + .map(u64::try_from) + .transpose() + .with_context(|| + format!("Integer field max(block_number) (value={highest:?}) is incompatible with u64 representation.") + ) + } + + /// Get the highest start [BlockNumber] of the block range roots stored in the database. + pub async fn get_highest_start_block_number_for_block_range_roots( + &self, + ) -> StdResult> { + let highest: Option = self .connection - .prepare(sql) - .with_context(|| { - format!( - "Prepare query error: SQL=`{}`", - &sql.replace('\n', " ").trim() - ) - })? - .iter() - .bind::<&[(_, Value)]>(&[(1, Value::Integer(immutable_file_number as i64))])? - .next() - { - None => Ok(None), - Some(row) => { - let highest = row?.read::, _>(0); - highest - .map(u64::try_from) - .transpose() - .with_context(|| - format!("Integer field max(block_number) (value={highest:?}) is incompatible with u64 representation.") - ) - } - } + .query_single_cell("select max(start) as highest from block_range_root;", &[])?; + highest + .map(u64::try_from) + .transpose() + .with_context(|| + format!("Integer field max(start) (value={highest:?}) is incompatible with u64 representation.") + ) } /// Retrieve all the Block Range Roots in database up to the given end block number excluded. @@ -212,30 +213,16 @@ impl CardanoTransactionRepository { pub async fn get_transaction_highest_immutable_file_number( &self, ) -> StdResult> { - let sql = "select max(immutable_file_number) as highest from cardano_tx;"; - match self - .connection - .prepare(sql) - .with_context(|| { - format!( - "Prepare query error: SQL=`{}`", - &sql.replace('\n', " ").trim() - ) - })? - .iter() - .next() - { - None => Ok(None), - Some(row) => { - let highest = row?.read::, _>(0); - highest - .map(u64::try_from) - .transpose() - .with_context(|| - format!("Integer field max(immutable_file_number) (value={highest:?}) is incompatible with u64 representation.") - ) - } - } + let highest: Option = self.connection.query_single_cell( + "select max(immutable_file_number) as highest from cardano_tx;", + &[], + )?; + highest + .map(u64::try_from) + .transpose() + .with_context(|| + format!("Integer field max(immutable_file_number) (value={highest:?}) is incompatible with u64 representation.") + ) } /// Store the given transactions in the database. @@ -301,6 +288,21 @@ impl CardanoTransactionRepository { Ok(transactions.collect()) } + + /// Prune the transactions older than the given number of blocks (based on the block range root + /// stored). + pub async fn prune_transaction(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()> { + if let Some(highest_block_range_start) = self + .get_highest_start_block_number_for_block_range_roots() + .await? + { + let provider = DeleteCardanoTransactionProvider::new(&self.connection); + let threshold = highest_block_range_start.saturating_sub(number_of_blocks_to_keep); + provider.prune(threshold)?.next(); + } + + Ok(()) + } } #[async_trait] @@ -863,4 +865,84 @@ mod tests { ); } } + + #[tokio::test] + async fn repository_prune_transactions() { + let connection = Arc::new(cardano_tx_db_connection().unwrap()); + let repository = CardanoTransactionRepository::new(connection); + + // Build transactions with block numbers from 20 to 50 + let cardano_transactions: Vec = (20..=50) + .map(|i| CardanoTransactionRecord { + transaction_hash: format!("tx-hash-{i}"), + block_number: i, + slot_number: i * 100, + block_hash: format!("block-hash-{i}"), + immutable_file_number: 1, + }) + .collect(); + + repository + .create_transactions(cardano_transactions.clone()) + .await + .unwrap(); + repository + .create_block_range_roots(vec![( + BlockRange::from_block_number(45), + MKTreeNode::from_hex("BBBB").unwrap(), + )]) + .await + .unwrap(); + + let transaction_result = repository.get_all().await.unwrap(); + assert_eq!(cardano_transactions.len(), transaction_result.len()); + + // Pruning with a number of block to keep greater than the highest block range start should + // do nothing. + repository.prune_transaction(10_000_000).await.unwrap(); + let transaction_result = repository.get_all_transactions().await.unwrap(); + assert_eq!(cardano_transactions, transaction_result); + + // Since the highest block range start is 45, pruning with 20 should remove transactions + // with a block number strictly below 25. + repository.prune_transaction(20).await.unwrap(); + let transaction_result = repository + .get_transactions_in_range_blocks(0..25) + .await + .unwrap(); + assert_eq!(Vec::::new(), transaction_result); + } + + #[tokio::test] + async fn get_highest_start_block_number_for_block_range_roots() { + let connection = Arc::new(cardano_tx_db_connection().unwrap()); + let repository = CardanoTransactionRepository::new(connection); + + let highest = repository + .get_highest_start_block_number_for_block_range_roots() + .await + .unwrap(); + assert_eq!(None, highest); + + let block_range_roots = vec![ + ( + BlockRange::from_block_number(15), + MKTreeNode::from_hex("AAAA").unwrap(), + ), + ( + BlockRange::from_block_number(30), + MKTreeNode::from_hex("BBBB").unwrap(), + ), + ]; + repository + .create_block_range_roots(block_range_roots.clone()) + .await + .unwrap(); + + let highest = repository + .get_highest_start_block_number_for_block_range_roots() + .await + .unwrap(); + assert_eq!(Some(30), highest); + } } diff --git a/internal/mithril-persistence/src/sqlite/connection_extensions.rs b/internal/mithril-persistence/src/sqlite/connection_extensions.rs new file mode 100644 index 00000000000..5b5bd56a49b --- /dev/null +++ b/internal/mithril-persistence/src/sqlite/connection_extensions.rs @@ -0,0 +1,78 @@ +use anyhow::Context; +use sqlite::{ReadableWithIndex, Value}; + +use mithril_common::StdResult; + +use crate::sqlite::SqliteConnection; + +/// Extension trait for the [SqliteConnection] type. +pub trait ConnectionExtensions { + /// Execute the given sql query and return the value of the first cell read. + fn query_single_cell, T: ReadableWithIndex>( + &self, + sql: Q, + params: &[Value], + ) -> StdResult; +} + +impl ConnectionExtensions for SqliteConnection { + fn query_single_cell, T: ReadableWithIndex>( + &self, + sql: Q, + params: &[Value], + ) -> StdResult { + let mut statement = self.prepare(&sql).with_context(|| { + format!( + "Prepare query error: SQL=`{}`", + sql.as_ref().replace('\n', " ").trim() + ) + })?; + statement.bind(params)?; + statement.next()?; + statement + .read::(0) + .with_context(|| "Read query error") + } +} + +#[cfg(test)] +mod tests { + use sqlite::Connection; + + use super::*; + + #[test] + fn test_query_string() { + let connection = Connection::open_thread_safe(":memory:").unwrap(); + let value: String = connection.query_single_cell("select 'test'", &[]).unwrap(); + + assert_eq!(value, "test"); + } + + #[test] + fn test_query_max_number() { + let connection = Connection::open_thread_safe(":memory:").unwrap(); + let value: i64 = connection + .query_single_cell( + "select max(a) from (select 10 a union select 90 a union select 45 a)", + &[], + ) + .unwrap(); + + assert_eq!(value, 90); + } + + #[test] + fn test_query_with_params() { + let connection = Connection::open_thread_safe(":memory:").unwrap(); + let value: i64 = connection + .query_single_cell( + "select max(a) from (select 10 a union select 45 a union select 90 a) \ + where a > ? and a < ?", + &[Value::Integer(10), Value::Integer(90)], + ) + .unwrap(); + + assert_eq!(value, 45); + } +} diff --git a/internal/mithril-persistence/src/sqlite/mod.rs b/internal/mithril-persistence/src/sqlite/mod.rs index 646a111534e..fa29e942ebc 100644 --- a/internal/mithril-persistence/src/sqlite/mod.rs +++ b/internal/mithril-persistence/src/sqlite/mod.rs @@ -4,6 +4,7 @@ //! structs. mod condition; mod connection_builder; +mod connection_extensions; mod cursor; mod entity; mod projection; @@ -12,6 +13,7 @@ mod source_alias; pub use condition::{GetAllCondition, WhereCondition}; pub use connection_builder::{ConnectionBuilder, ConnectionOptions}; +pub use connection_extensions::ConnectionExtensions; pub use cursor::EntityCursor; pub use entity::{HydrationError, SqLiteEntity}; pub use projection::{Projection, ProjectionField}; diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 2d9933375b4..01904a9a64a 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.7" +version = "0.5.8" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/configuration.rs b/mithril-aggregator/src/configuration.rs index 11ecf0a9602..adae3b344f1 100644 --- a/mithril-aggregator/src/configuration.rs +++ b/mithril-aggregator/src/configuration.rs @@ -382,6 +382,12 @@ impl Default for DefaultConfiguration { } } +impl DefaultConfiguration { + fn namespace() -> String { + "default configuration".to_string() + } +} + impl From for ValueKind { fn from(value: ExecutionEnvironment) -> Self { match value { @@ -396,92 +402,55 @@ impl Source for DefaultConfiguration { Box::new(self.clone()) } - fn collect(&self) -> Result, config::ConfigError> { + fn collect(&self) -> Result, ConfigError> { + fn into_value>(value: V) -> Value { + Value::new(Some(&DefaultConfiguration::namespace()), value.into()) + } let mut result = Map::new(); - let namespace = "default configuration".to_string(); let myself = self.clone(); - result.insert( - "environment".to_string(), - Value::new(Some(&namespace), ValueKind::from(myself.environment)), - ); - result.insert( - "server_ip".to_string(), - Value::new(Some(&namespace), ValueKind::from(myself.server_ip)), - ); - result.insert( - "server_port".to_string(), - Value::new(Some(&namespace), ValueKind::from(myself.server_port)), - ); - result.insert( - "db_directory".to_string(), - Value::new(Some(&namespace), ValueKind::from(myself.db_directory)), - ); + result.insert("environment".to_string(), into_value(myself.environment)); + result.insert("server_ip".to_string(), into_value(myself.server_ip)); + result.insert("server_port".to_string(), into_value(myself.server_port)); + result.insert("db_directory".to_string(), into_value(myself.db_directory)); result.insert( "snapshot_directory".to_string(), - Value::new(Some(&namespace), ValueKind::from(myself.snapshot_directory)), + into_value(myself.snapshot_directory), ); result.insert( "snapshot_store_type".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.snapshot_store_type), - ), + into_value(myself.snapshot_store_type), ); result.insert( "snapshot_uploader_type".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.snapshot_uploader_type), - ), + into_value(myself.snapshot_uploader_type), ); result.insert( "era_reader_adapter_type".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.era_reader_adapter_type), - ), + into_value(myself.era_reader_adapter_type), ); result.insert( "reset_digests_cache".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.reset_digests_cache), - ), + into_value(myself.reset_digests_cache), ); result.insert( "disable_digests_cache".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.disable_digests_cache), - ), + into_value(myself.disable_digests_cache), ); result.insert( "snapshot_compression_algorithm".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.snapshot_compression_algorithm), - ), + into_value(myself.snapshot_compression_algorithm), ); result.insert( "snapshot_use_cdn_domain".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.snapshot_use_cdn_domain), - ), + into_value(myself.snapshot_use_cdn_domain), ); result.insert( "signer_importer_run_interval".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.signer_importer_run_interval), - ), + into_value(myself.signer_importer_run_interval), ); result.insert( "allow_unparsable_block".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.allow_unparsable_block), - ), + into_value(myself.allow_unparsable_block), ); Ok(result) diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index d5f23368c78..6a36a84582f 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.135" +version = "0.2.136" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/src/aggregator_client.rs b/mithril-signer/src/aggregator_client.rs index 6b5a1831bd4..f4b7ac8683b 100644 --- a/mithril-signer/src/aggregator_client.rs +++ b/mithril-signer/src/aggregator_client.rs @@ -414,36 +414,15 @@ mod tests { use mithril_common::era::{EraChecker, SupportedEra}; use mithril_common::messages::TryFromMessageAdapter; use serde_json::json; - use std::path::{Path, PathBuf}; use crate::configuration::Configuration; - use mithril_common::era::adapters::EraReaderAdapterType; use mithril_common::test_utils::fake_data; fn setup_test() -> (MockServer, Configuration, APIVersionProvider) { let server = MockServer::start(); let config = Configuration { - cardano_cli_path: PathBuf::new().join("cardano-cli"), - cardano_node_socket_path: PathBuf::new().join("whatever"), - network_magic: Some(42), - network: "testnet".to_string(), aggregator_endpoint: server.url(""), - relay_endpoint: None, - party_id: Some("0".to_string()), - run_interval: 100, - db_directory: Path::new("./db").to_path_buf(), - data_stores_directory: Path::new("./stores").to_path_buf(), - store_retention_limit: None, - kes_secret_key_path: None, - operational_certificate_path: None, - disable_digests_cache: false, - reset_digests_cache: false, - era_reader_adapter_type: EraReaderAdapterType::Bootstrap, - era_reader_adapter_params: None, - enable_metrics_server: true, - metrics_server_ip: "0.0.0.0".to_string(), - metrics_server_port: 9090, - allow_unparsable_block: false, + ..Configuration::new_sample("0") }; let era_checker = EraChecker::new(SupportedEra::dummy(), Epoch(1)); let api_version_provider = APIVersionProvider::new(Arc::new(era_checker)); diff --git a/mithril-signer/src/configuration.rs b/mithril-signer/src/configuration.rs index bed8a62348c..ac1590ee2b4 100644 --- a/mithril-signer/src/configuration.rs +++ b/mithril-signer/src/configuration.rs @@ -7,7 +7,7 @@ use std::{path::PathBuf, sync::Arc}; use mithril_common::{ chain_observer::ChainObserver, crypto_helper::tests_setup, - entities::PartyId, + entities::{BlockNumber, PartyId}, era::{ adapters::{EraReaderAdapterBuilder, EraReaderAdapterType}, EraReaderAdapter, @@ -27,14 +27,18 @@ pub struct Configuration { #[example = "`/tmp/cardano.sock`"] pub cardano_node_socket_path: PathBuf, + /// Cardano network + #[example = "`testnet` or `mainnet` or `devnet`"] + pub network: String, + /// Cardano Network Magic number /// useful for TestNet & DevNet #[example = "`1097911063` or `42`"] pub network_magic: Option, - /// Cardano network - #[example = "`testnet` or `mainnet` or `devnet`"] - pub network: String, + /// Also known as `k`, it defines the number of blocks that are required for the blockchain to + /// be considered final, preventing any further rollback `[default: 2160]`. + pub network_security_parameter: BlockNumber, /// Aggregator endpoint #[example = "`https://aggregator.pre-release-preview.api.mithril.network/aggregator`"] @@ -95,13 +99,19 @@ pub struct Configuration { /// /// Will be ignored on (pre)production networks. pub allow_unparsable_block: bool, + + /// If set, the signer will prune the cardano transactions in database older than the + /// [network_security_parameter][Self::network_security_parameter] blocks after each import + /// `[default: true]`. + pub enable_transaction_pruning: bool, } impl Configuration { /// Create a sample configuration mainly for tests #[doc(hidden)] - pub fn new_sample(party_id: &PartyId) -> Self { - let signer_temp_dir = tests_setup::setup_temp_directory_for_signer(party_id, false); + pub fn new_sample>(party_id: P) -> Self { + let party_id: PartyId = party_id.into(); + let signer_temp_dir = tests_setup::setup_temp_directory_for_signer(&party_id, false); Self { aggregator_endpoint: "http://0.0.0.0:8000".to_string(), relay_endpoint: None, @@ -110,7 +120,8 @@ impl Configuration { db_directory: PathBuf::new(), network: "devnet".to_string(), network_magic: Some(42), - party_id: Some(party_id.to_owned()), + network_security_parameter: 2160, + party_id: Some(party_id), run_interval: 5000, data_stores_directory: PathBuf::new(), store_retention_limit: None, @@ -126,6 +137,7 @@ impl Configuration { metrics_server_ip: "0.0.0.0".to_string(), metrics_server_port: 9090, allow_unparsable_block: false, + enable_transaction_pruning: false, } } @@ -186,6 +198,18 @@ pub struct DefaultConfiguration { /// Metrics HTTP server listening port. pub metrics_server_port: u16, + + /// Network security parameter + pub network_security_parameter: BlockNumber, + + /// Transaction pruning toggle + pub enable_transaction_pruning: bool, +} + +impl DefaultConfiguration { + fn namespace() -> String { + "default configuration".to_string() + } } impl Default for DefaultConfiguration { @@ -194,6 +218,8 @@ impl Default for DefaultConfiguration { era_reader_adapter_type: "bootstrap".to_string(), metrics_server_ip: "0.0.0.0".to_string(), metrics_server_port: 9090, + network_security_parameter: 2160, // 2160 is the mainnet value + enable_transaction_pruning: true, } } } @@ -204,29 +230,35 @@ impl Source for DefaultConfiguration { } fn collect(&self) -> Result, ConfigError> { + fn into_value>(value: V) -> Value { + Value::new(Some(&DefaultConfiguration::namespace()), value.into()) + } let mut result = Map::new(); - let namespace = "default configuration".to_string(); let myself = self.clone(); result.insert( "era_reader_adapter_type".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.era_reader_adapter_type), - ), + into_value(myself.era_reader_adapter_type), ); result.insert( "metrics_server_ip".to_string(), - Value::new(Some(&namespace), ValueKind::from(myself.metrics_server_ip)), + into_value(myself.metrics_server_ip), ); result.insert( "metrics_server_port".to_string(), - Value::new( - Some(&namespace), - ValueKind::from(myself.metrics_server_port), - ), + into_value(myself.metrics_server_port), + ); + + result.insert( + "network_security_parameter".to_string(), + into_value(myself.network_security_parameter), + ); + + result.insert( + "enable_transaction_pruning".to_string(), + into_value(myself.enable_transaction_pruning), ); Ok(result) diff --git a/mithril-signer/src/database/repository/cardano_transaction_repository.rs b/mithril-signer/src/database/repository/cardano_transaction_repository.rs index 68a8f464f74..d0d8c7c76b4 100644 --- a/mithril-signer/src/database/repository/cardano_transaction_repository.rs +++ b/mithril-signer/src/database/repository/cardano_transaction_repository.rs @@ -7,7 +7,7 @@ use mithril_common::entities::{BlockNumber, BlockRange, CardanoTransaction, Immu use mithril_common::StdResult; use mithril_persistence::database::repository::CardanoTransactionRepository; -use crate::TransactionStore; +use crate::{TransactionPruner, TransactionStore}; #[async_trait] impl TransactionStore for CardanoTransactionRepository { @@ -46,3 +46,10 @@ impl TransactionStore for CardanoTransactionRepository { Ok(()) } } + +#[async_trait] +impl TransactionPruner for CardanoTransactionRepository { + async fn prune(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()> { + self.prune_transaction(number_of_blocks_to_keep).await + } +} diff --git a/mithril-signer/src/lib.rs b/mithril-signer/src/lib.rs index d483977f197..3d857a13f7d 100644 --- a/mithril-signer/src/lib.rs +++ b/mithril-signer/src/lib.rs @@ -15,6 +15,7 @@ pub mod metrics; mod protocol_initializer_store; mod runtime; mod single_signer; +mod transactions_importer_with_pruner; #[cfg(test)] pub use aggregator_client::dumb::DumbAggregatorClient; @@ -28,6 +29,7 @@ pub use metrics::*; pub use protocol_initializer_store::{ProtocolInitializerStore, ProtocolInitializerStorer}; pub use runtime::*; pub use single_signer::*; +pub use transactions_importer_with_pruner::*; /// HTTP request timeout duration in milliseconds const HTTP_REQUEST_TIMEOUT_DURATION: u64 = 30000; diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index b5eb89e9dc3..e75611b0102 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -459,10 +459,7 @@ mod tests { crypto_helper::{MKMap, MKMapNode, MKTreeNode, ProtocolInitializer}, digesters::{DumbImmutableDigester, DumbImmutableFileObserver}, entities::{BlockRange, CardanoDbBeacon, Epoch, ImmutableFileNumber, StakeDistribution}, - era::{ - adapters::{EraReaderAdapterType, EraReaderBootstrapAdapter}, - EraChecker, EraReader, - }, + era::{adapters::EraReaderBootstrapAdapter, EraChecker, EraReader}, signable_builder::{ BlockRangeRootRetriever, CardanoImmutableFilesFullSignableBuilder, CardanoTransactionsSignableBuilder, MithrilSignableBuilderService, @@ -474,10 +471,7 @@ mod tests { use mithril_persistence::store::adapter::{DumbStoreAdapter, MemoryAdapter}; use mithril_persistence::store::{StakeStore, StakeStorer}; use mockall::mock; - use std::{ - path::{Path, PathBuf}, - sync::Arc, - }; + use std::{path::Path, sync::Arc}; use crate::{ metrics::MetricsService, AggregatorClient, CardanoTransactionsImporter, @@ -597,34 +591,9 @@ mod tests { maybe_services: Option, maybe_config: Option, ) -> SignerRunner { - let services = init_services().await; - let config = Configuration { - aggregator_endpoint: "http://0.0.0.0:3000".to_string(), - relay_endpoint: None, - cardano_cli_path: PathBuf::new(), - cardano_node_socket_path: PathBuf::new(), - db_directory: PathBuf::new(), - network: "whatever".to_string(), - network_magic: None, - party_id: Some("1".to_string()), - run_interval: 100, - data_stores_directory: PathBuf::new(), - kes_secret_key_path: None, - operational_certificate_path: None, - disable_digests_cache: false, - store_retention_limit: None, - reset_digests_cache: false, - era_reader_adapter_type: EraReaderAdapterType::Bootstrap, - era_reader_adapter_params: None, - enable_metrics_server: true, - metrics_server_ip: "0.0.0.0".to_string(), - metrics_server_port: 9090, - allow_unparsable_block: false, - }; - SignerRunner::new( - maybe_config.unwrap_or(config), - maybe_services.unwrap_or(services), + maybe_config.unwrap_or(Configuration::new_sample("1")), + maybe_services.unwrap_or(init_services().await), ) } diff --git a/mithril-signer/src/runtime/signer_services.rs b/mithril-signer/src/runtime/signer_services.rs index 95a6c157bd1..1472149ee1a 100644 --- a/mithril-signer/src/runtime/signer_services.rs +++ b/mithril-signer/src/runtime/signer_services.rs @@ -29,8 +29,8 @@ use mithril_persistence::{ use crate::{ aggregator_client::AggregatorClient, metrics::MetricsService, single_signer::SingleSigner, AggregatorHTTPClient, CardanoTransactionsImporter, Configuration, MithrilSingleSigner, - ProtocolInitializerStore, ProtocolInitializerStorer, HTTP_REQUEST_TIMEOUT_DURATION, - SQLITE_FILE, SQLITE_FILE_CARDANO_TRANSACTION, + ProtocolInitializerStore, ProtocolInitializerStorer, TransactionsImporterWithPruner, + HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE, SQLITE_FILE_CARDANO_TRANSACTION, }; type StakeStoreService = Arc; @@ -265,17 +265,25 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { let transaction_store = Arc::new(CardanoTransactionRepository::new( transaction_sqlite_connection, )); - let transactions_importer = CardanoTransactionsImporter::new( + let transactions_importer = Arc::new(CardanoTransactionsImporter::new( block_scanner, transaction_store.clone(), &self.config.db_directory, // Rescan the last immutable when importing transactions, it may have been partially imported Some(1), slog_scope::logger(), - ); + )); + // Wrap the transaction importer with decorator to prune the transactions after import + let transactions_importer = Arc::new(TransactionsImporterWithPruner::new( + self.config + .enable_transaction_pruning + .then_some(self.config.network_security_parameter), + transaction_store.clone(), + transactions_importer, + )); let block_range_root_retriever = transaction_store.clone(); let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::new( - Arc::new(transactions_importer), + transactions_importer, block_range_root_retriever, slog_scope::logger(), )); @@ -350,7 +358,6 @@ mod tests { chain_observer::FakeObserver, digesters::DumbImmutableFileObserver, entities::{Epoch, TimePoint}, - era::adapters::EraReaderAdapterType, test_utils::TempDir, }; @@ -366,27 +373,8 @@ mod tests { async fn test_auto_create_stores_directory() { let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores"); let config = Configuration { - cardano_cli_path: PathBuf::new(), - cardano_node_socket_path: PathBuf::new(), - network_magic: None, - network: "preview".to_string(), - aggregator_endpoint: "".to_string(), - relay_endpoint: None, - party_id: Some("party-123456".to_string()), - run_interval: 1000, - db_directory: PathBuf::new(), data_stores_directory: stores_dir.clone(), - store_retention_limit: None, - kes_secret_key_path: None, - operational_certificate_path: None, - disable_digests_cache: false, - reset_digests_cache: false, - era_reader_adapter_type: EraReaderAdapterType::Bootstrap, - era_reader_adapter_params: None, - enable_metrics_server: true, - metrics_server_ip: "0.0.0.0".to_string(), - metrics_server_port: 9090, - allow_unparsable_block: false, + ..Configuration::new_sample("party-123456") }; assert!(!stores_dir.exists()); diff --git a/mithril-signer/src/transactions_importer_with_pruner.rs b/mithril-signer/src/transactions_importer_with_pruner.rs new file mode 100644 index 00000000000..6f0e34c875c --- /dev/null +++ b/mithril-signer/src/transactions_importer_with_pruner.rs @@ -0,0 +1,129 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use mithril_common::entities::{BlockNumber, ImmutableFileNumber}; +use mithril_common::signable_builder::TransactionsImporter; +use mithril_common::StdResult; + +/// Cardano transactions pruner +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait TransactionPruner: Send + Sync { + /// Prune the transactions older than the given number of blocks. + async fn prune(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()>; +} + +/// A decorator of [TransactionsImporter] that prunes the transactions older than a given number of +/// blocks after running the import. +/// +/// If the number of blocks to keep is not provided, no pruning is performed. +pub struct TransactionsImporterWithPruner { + number_of_blocks_to_keep: Option, + transaction_pruner: Arc, + wrapped_importer: Arc, +} + +impl TransactionsImporterWithPruner { + /// Create a new instance of [TransactionsImporterWithPruner]. + pub fn new( + number_of_blocks_to_keep: Option, + transaction_pruner: Arc, + wrapped_importer: Arc, + ) -> Self { + Self { + number_of_blocks_to_keep, + transaction_pruner, + wrapped_importer, + } + } +} + +#[async_trait] +impl TransactionsImporter for TransactionsImporterWithPruner { + async fn import(&self, up_to_beacon: ImmutableFileNumber) -> StdResult<()> { + self.wrapped_importer.import(up_to_beacon).await?; + + if let Some(number_of_blocks_to_keep) = self.number_of_blocks_to_keep { + self.transaction_pruner + .prune(number_of_blocks_to_keep) + .await?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use mockall::mock; + use mockall::predicate::eq; + + use super::*; + + mock! { + pub TransactionImporterImpl {} + + #[async_trait] + impl TransactionsImporter for TransactionImporterImpl { + async fn import(&self, up_to_beacon: ImmutableFileNumber) -> StdResult<()>; + } + } + + impl TransactionsImporterWithPruner { + pub fn new_with_mock( + number_of_blocks_to_keep: Option, + transaction_pruner_mock_config: P, + importer_mock_config: I, + ) -> Self + where + P: FnOnce(&mut MockTransactionPruner), + I: FnOnce(&mut MockTransactionImporterImpl), + { + let mut transaction_pruner = MockTransactionPruner::new(); + transaction_pruner_mock_config(&mut transaction_pruner); + let mut transaction_importer = MockTransactionImporterImpl::new(); + importer_mock_config(&mut transaction_importer); + + Self::new( + number_of_blocks_to_keep, + Arc::new(transaction_pruner), + Arc::new(transaction_importer), + ) + } + } + + #[tokio::test] + async fn test_does_not_prune_if_none_is_configured() { + let importer = TransactionsImporterWithPruner::new_with_mock( + None, + |mock| { + mock.expect_prune().never(); + }, + |mock| { + mock.expect_import().once().returning(|_| Ok(())); + }, + ); + + importer.import(10).await.expect("Import should not fail"); + } + + #[tokio::test] + async fn test_does_prune_if_a_block_number_is_configured() { + let expected_block_number: BlockNumber = 5; + let importer = TransactionsImporterWithPruner::new_with_mock( + Some(expected_block_number), + |mock| { + mock.expect_prune() + .with(eq(expected_block_number)) + .once() + .returning(|_| Ok(())); + }, + |mock| { + mock.expect_import().once().returning(|_| Ok(())); + }, + ); + + importer.import(10).await.expect("Import should not fail"); + } +}