From 4cee1087d3dd1acbe644df687ebf5aa836f035b4 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 27 Feb 2025 17:06:08 +0400 Subject: [PATCH] fix(consensus)!: mutate validator fee substate using deposit/withdrawal --- .../src/command/transaction.rs | 2 +- .../src/handlers/validator.rs | 2 +- .../src/p2p/services/mempool/service.rs | 6 +- .../src/command/transaction.rs | 4 +- bindings/dist/helpers/helpers.d.ts | 2 +- bindings/dist/index.d.ts | 1 + bindings/dist/index.js | 1 + bindings/dist/types/SubstateDiff.d.ts | 2 + .../dist/types/ValidatorFeeWithdrawal.d.ts | 6 + bindings/dist/types/ValidatorFeeWithdrawal.js | 1 + .../ValidatorNodeChange.d.ts | 2 + bindings/src/helpers/helpers.ts | 6 +- bindings/src/index.ts | 1 + bindings/src/types/SubstateDiff.ts | 2 + bindings/src/types/ValidatorFeeWithdrawal.ts | 8 + .../ValidatorNodeChange.ts | 3 +- clients/wallet_daemon_client/src/types.rs | 2 +- dan_layer/common_types/src/committee.rs | 36 +-- dan_layer/common_types/src/fee_pool.rs | 2 +- dan_layer/common_types/src/shard_group.rs | 7 + dan_layer/common_types/src/substate_type.rs | 8 +- .../common_types/src/versioned_substate_id.rs | 6 + .../src/hotstuff/block_change_set.rs | 4 +- dan_layer/consensus/src/hotstuff/common.rs | 85 ++++--- .../consensus/src/hotstuff/on_propose.rs | 218 ++++++++++++++---- .../on_ready_to_vote_on_local_block.rs | 5 +- .../src/hotstuff/substate_store/error.rs | 2 + .../hotstuff/substate_store/pending_store.rs | 196 +++++++++++++--- .../hotstuff/transaction_manager/manager.rs | 12 +- dan_layer/consensus_tests/src/consensus.rs | 108 ++++++++- .../src/support/epoch_manager.rs | 33 ++- .../src/support/executions_store.rs | 3 +- .../consensus_tests/src/support/harness.rs | 24 +- .../src/support/transaction.rs | 19 +- .../src/support/transaction_executor.rs | 9 + .../src/support/validator/builder.rs | 8 + dan_layer/engine/src/runtime/impl.rs | 8 +- dan_layer/engine/src/runtime/mod.rs | 2 +- dan_layer/engine/src/runtime/tracker.rs | 3 +- dan_layer/engine/src/runtime/working_state.rs | 46 ++-- dan_layer/engine_types/src/indexed_value.rs | 2 +- dan_layer/engine_types/src/instruction.rs | 2 +- dan_layer/engine_types/src/lib.rs | 4 +- dan_layer/engine_types/src/substate.rs | 46 +++- dan_layer/engine_types/src/substate_serde.rs | 2 +- .../src/{vn_fee_pool.rs => validator_fee.rs} | 63 ++++- .../up.sql | 2 +- dan_layer/state_store_sqlite/src/schema.rs | 11 + dan_layer/static_epoch_oracle/Cargo.toml | 10 - dan_layer/static_epoch_oracle/src/config.rs | 4 - dan_layer/static_epoch_oracle/src/lib.rs | 5 - dan_layer/static_epoch_oracle/src/oracle.rs | 19 -- .../storage/src/consensus_models/block.rs | 14 +- .../src/consensus_models/block_diff.rs | 9 +- .../src/consensus_models/lock_intent.rs | 6 + .../src/consensus_models/substate_change.rs | 8 +- dan_layer/template_lib/src/models/amount.rs | 11 +- dan_layer/transaction/src/builder.rs | 2 +- utilities/tariswap_test_bench/src/runner.rs | 1 + .../tariswap_test_bench/src/templates.rs | 4 +- 60 files changed, 835 insertions(+), 285 deletions(-) create mode 100644 bindings/dist/types/ValidatorFeeWithdrawal.d.ts create mode 100644 bindings/dist/types/ValidatorFeeWithdrawal.js create mode 100644 bindings/src/types/ValidatorFeeWithdrawal.ts rename dan_layer/engine_types/src/{vn_fee_pool.rs => validator_fee.rs} (64%) delete mode 100644 dan_layer/static_epoch_oracle/Cargo.toml delete mode 100644 dan_layer/static_epoch_oracle/src/config.rs delete mode 100644 dan_layer/static_epoch_oracle/src/lib.rs delete mode 100644 dan_layer/static_epoch_oracle/src/oracle.rs diff --git a/applications/tari_dan_wallet_cli/src/command/transaction.rs b/applications/tari_dan_wallet_cli/src/command/transaction.rs index d4651eada1..6617ec41da 100644 --- a/applications/tari_dan_wallet_cli/src/command/transaction.rs +++ b/applications/tari_dan_wallet_cli/src/command/transaction.rs @@ -547,7 +547,7 @@ pub fn print_substate_diff(diff: &SubstateDiff) { }, SubstateValue::ValidatorFeePool(pool) => { println!(" ▶ Validator Fee Pool: {}", address); - println!(" ▶ Total fees: {}", pool.amount); + println!(" ▶ Total fees: {}", pool.amount()); }, } println!(); diff --git a/applications/tari_dan_wallet_daemon/src/handlers/validator.rs b/applications/tari_dan_wallet_daemon/src/handlers/validator.rs index cfc90a109f..c32a9b6730 100644 --- a/applications/tari_dan_wallet_daemon/src/handlers/validator.rs +++ b/applications/tari_dan_wallet_daemon/src/handlers/validator.rs @@ -89,7 +89,7 @@ pub async fn handle_get_validator_fees( continue; }; - let Some(amount) = result.substate.as_validator_fee_pool().map(|p| p.amount) else { + let Some(amount) = result.substate.as_validator_fee_pool().map(|p| p.amount()) else { warn!(target: LOG_TARGET, "Incorrect substate type found at address {}", address); continue; }; diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 0c26d4c090..84bc8500da 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashSet, fmt::Display, iter}; +use std::{collections::HashSet, fmt::Display}; use libp2p::{gossipsub, PeerId}; use log::*; @@ -248,10 +248,10 @@ where TValidator: Validator { println!(" ▶ fee_pool: {}", address); - println!(" ▶ amount: {}", fee_pool.amount); - println!(" ▶ recipient: {}", fee_pool.claim_public_key); + println!(" ▶ amount: {}", fee_pool.amount()); + println!(" ▶ recipient: {}", fee_pool.claim_public_key()); }, SubstateValue::Template(_) => { println!(" ▶ Template: {}", address); diff --git a/bindings/dist/helpers/helpers.d.ts b/bindings/dist/helpers/helpers.d.ts index 5d70bc7263..e28adc7388 100644 --- a/bindings/dist/helpers/helpers.d.ts +++ b/bindings/dist/helpers/helpers.d.ts @@ -5,7 +5,7 @@ import { SubstateId } from "../types/SubstateId"; import { TransactionResult } from "../types/TransactionResult"; export declare function substateIdToString(substateId: SubstateId | string | null | undefined): string; export declare function stringToSubstateId(substateId: string): SubstateId; -export declare function shortenSubstateId(substateId: SubstateId | null | undefined, start?: number, end?: number): string; +export declare function shortenSubstateId(substateId: SubstateId | string | null | undefined, start?: number, end?: number): string; export declare function shortenString(string: string, start?: number, end?: number): string; export declare function rejectReasonToString(reason: RejectReason | null): string; export declare function getSubstateDiffFromTransactionResult(result: TransactionResult): SubstateDiff | null; diff --git a/bindings/dist/index.d.ts b/bindings/dist/index.d.ts index 4848d1ca90..153a3e457a 100644 --- a/bindings/dist/index.d.ts +++ b/bindings/dist/index.d.ts @@ -117,6 +117,7 @@ export * from "./types/UnsignedTransaction"; export * from "./types/UnsignedTransactionV1"; export * from "./types/ValidatorFeePoolAddress"; export * from "./types/ValidatorFeePool"; +export * from "./types/ValidatorFeeWithdrawal"; export * from "./types/ValidatorSignature"; export * from "./types/VaultId"; export * from "./types/Vault"; diff --git a/bindings/dist/index.js b/bindings/dist/index.js index 3a17a7a850..5cd437b56b 100644 --- a/bindings/dist/index.js +++ b/bindings/dist/index.js @@ -119,6 +119,7 @@ export * from "./types/UnsignedTransaction"; export * from "./types/UnsignedTransactionV1"; export * from "./types/ValidatorFeePoolAddress"; export * from "./types/ValidatorFeePool"; +export * from "./types/ValidatorFeeWithdrawal"; export * from "./types/ValidatorSignature"; export * from "./types/VaultId"; export * from "./types/Vault"; diff --git a/bindings/dist/types/SubstateDiff.d.ts b/bindings/dist/types/SubstateDiff.d.ts index 6be5803ec9..bdaa085557 100644 --- a/bindings/dist/types/SubstateDiff.d.ts +++ b/bindings/dist/types/SubstateDiff.d.ts @@ -1,6 +1,8 @@ import type { Substate } from "./Substate"; import type { SubstateId } from "./SubstateId"; +import type { ValidatorFeeWithdrawal } from "./ValidatorFeeWithdrawal"; export interface SubstateDiff { up_substates: Array<[SubstateId, Substate]>; down_substates: Array<[SubstateId, number]>; + fee_withdrawals: Array; } diff --git a/bindings/dist/types/ValidatorFeeWithdrawal.d.ts b/bindings/dist/types/ValidatorFeeWithdrawal.d.ts new file mode 100644 index 0000000000..deef0aeab1 --- /dev/null +++ b/bindings/dist/types/ValidatorFeeWithdrawal.d.ts @@ -0,0 +1,6 @@ +import type { Amount } from "./Amount"; +import type { ValidatorFeePoolAddress } from "./ValidatorFeePoolAddress"; +export interface ValidatorFeeWithdrawal { + address: ValidatorFeePoolAddress; + amount: Amount; +} diff --git a/bindings/dist/types/ValidatorFeeWithdrawal.js b/bindings/dist/types/ValidatorFeeWithdrawal.js new file mode 100644 index 0000000000..cb0ff5c3b5 --- /dev/null +++ b/bindings/dist/types/ValidatorFeeWithdrawal.js @@ -0,0 +1 @@ +export {}; diff --git a/bindings/dist/types/validator-node-client/ValidatorNodeChange.d.ts b/bindings/dist/types/validator-node-client/ValidatorNodeChange.d.ts index e2b24ea999..683eb8ea9c 100644 --- a/bindings/dist/types/validator-node-client/ValidatorNodeChange.d.ts +++ b/bindings/dist/types/validator-node-client/ValidatorNodeChange.d.ts @@ -1,9 +1,11 @@ import type { Epoch } from "../Epoch"; +import type { SubstateAddress } from "../SubstateAddress"; export type ValidatorNodeChange = { Add: { public_key: string; activation_epoch: Epoch; minimum_value_promise: bigint; + shard_key: SubstateAddress; }; } | { Remove: { diff --git a/bindings/src/helpers/helpers.ts b/bindings/src/helpers/helpers.ts index 9cb30676eb..2b2c28b8db 100644 --- a/bindings/src/helpers/helpers.ts +++ b/bindings/src/helpers/helpers.ts @@ -73,7 +73,11 @@ export function stringToSubstateId(substateId: string): SubstateId { } } -export function shortenSubstateId(substateId: SubstateId | null | undefined, start: number = 4, end: number = 4) { +export function shortenSubstateId( + substateId: SubstateId | string | null | undefined, + start: number = 4, + end: number = 4, +) { if (substateId === null || substateId === undefined) { return ""; } diff --git a/bindings/src/index.ts b/bindings/src/index.ts index d0afbf6155..aafd99b174 100644 --- a/bindings/src/index.ts +++ b/bindings/src/index.ts @@ -120,6 +120,7 @@ export * from "./types/UnsignedTransaction"; export * from "./types/UnsignedTransactionV1"; export * from "./types/ValidatorFeePoolAddress"; export * from "./types/ValidatorFeePool"; +export * from "./types/ValidatorFeeWithdrawal"; export * from "./types/ValidatorSignature"; export * from "./types/VaultId"; export * from "./types/Vault"; diff --git a/bindings/src/types/SubstateDiff.ts b/bindings/src/types/SubstateDiff.ts index 74fc6a30de..1e064768ff 100644 --- a/bindings/src/types/SubstateDiff.ts +++ b/bindings/src/types/SubstateDiff.ts @@ -1,8 +1,10 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { Substate } from "./Substate"; import type { SubstateId } from "./SubstateId"; +import type { ValidatorFeeWithdrawal } from "./ValidatorFeeWithdrawal"; export interface SubstateDiff { up_substates: Array<[SubstateId, Substate]>; down_substates: Array<[SubstateId, number]>; + fee_withdrawals: Array; } diff --git a/bindings/src/types/ValidatorFeeWithdrawal.ts b/bindings/src/types/ValidatorFeeWithdrawal.ts new file mode 100644 index 0000000000..8d0f8b7f63 --- /dev/null +++ b/bindings/src/types/ValidatorFeeWithdrawal.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { Amount } from "./Amount"; +import type { ValidatorFeePoolAddress } from "./ValidatorFeePoolAddress"; + +export interface ValidatorFeeWithdrawal { + address: ValidatorFeePoolAddress; + amount: Amount; +} diff --git a/bindings/src/types/validator-node-client/ValidatorNodeChange.ts b/bindings/src/types/validator-node-client/ValidatorNodeChange.ts index 40ab2a3e4d..2619e5366a 100644 --- a/bindings/src/types/validator-node-client/ValidatorNodeChange.ts +++ b/bindings/src/types/validator-node-client/ValidatorNodeChange.ts @@ -1,6 +1,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { Epoch } from "../Epoch"; +import type { SubstateAddress } from "../SubstateAddress"; export type ValidatorNodeChange = - | { Add: { public_key: string; activation_epoch: Epoch; minimum_value_promise: bigint } } + | { Add: { public_key: string; activation_epoch: Epoch; minimum_value_promise: bigint; shard_key: SubstateAddress } } | { Remove: { public_key: string } }; diff --git a/clients/wallet_daemon_client/src/types.rs b/clients/wallet_daemon_client/src/types.rs index 303821aa7c..3ac16e6f5c 100644 --- a/clients/wallet_daemon_client/src/types.rs +++ b/clients/wallet_daemon_client/src/types.rs @@ -42,8 +42,8 @@ use tari_engine_types::{ instruction_result::InstructionResult, serde_with, substate::{SubstateId, SubstateValue}, - vn_fee_pool::ValidatorFeePoolAddress, TemplateAddress, + ValidatorFeePoolAddress, }; use tari_template_abi::TemplateDef; use tari_template_lib::{ diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index 79d65fe6bd..47d26ed297 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -1,7 +1,7 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{borrow::Borrow, cmp, ops::RangeInclusive}; +use std::{cmp, ops::RangeInclusive}; use rand::{rngs::OsRng, seq::SliceRandom}; use serde::{Deserialize, Serialize}; @@ -254,32 +254,14 @@ impl CommitteeInfo { self.shard_group.contains(&shard) } - pub fn includes_all_substate_addresses, B: Borrow>( - &self, - substate_addresses: I, - ) -> bool { - substate_addresses - .into_iter() - .all(|substate_address| self.includes_substate_address(substate_address.borrow())) - } - - pub fn includes_any_address, B: Borrow>( - &self, - substate_addresses: I, - ) -> bool { - substate_addresses - .into_iter() - .any(|substate_address| self.includes_substate_address(substate_address.borrow())) - } - - pub fn filter<'a, I, B>(&'a self, items: I) -> impl Iterator + 'a - where - I: IntoIterator + 'a, - B: Borrow, - { - items - .into_iter() - .filter(|substate_address| self.includes_substate_address(substate_address.borrow())) + pub fn is_all_local, I: IntoIterator>(&self, substate_ids: I) -> bool { + substate_ids.into_iter().all(|substate_id| { + let substate_id = substate_id.as_ref(); + if substate_id.is_global() && self.num_committees > 1 { + return false; + } + self.includes_substate_id(substate_id) + }) } pub fn all_shard_groups_iter(&self) -> impl Iterator { diff --git a/dan_layer/common_types/src/fee_pool.rs b/dan_layer/common_types/src/fee_pool.rs index 588eb50b7e..ea9d06da29 100644 --- a/dan_layer/common_types/src/fee_pool.rs +++ b/dan_layer/common_types/src/fee_pool.rs @@ -1,7 +1,7 @@ // Copyright 2025 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tari_engine_types::vn_fee_pool::ValidatorFeePoolAddress; +use tari_engine_types::ValidatorFeePoolAddress; use crate::{shard::Shard, uint::U256, NumPreshards}; diff --git a/dan_layer/common_types/src/shard_group.rs b/dan_layer/common_types/src/shard_group.rs index 9fee899820..72818607e2 100644 --- a/dan_layer/common_types/src/shard_group.rs +++ b/dan_layer/common_types/src/shard_group.rs @@ -91,6 +91,13 @@ impl ShardGroup { self.as_range().contains(shard) } + pub fn contains_or_global(&self, shard: &Shard) -> bool { + if shard.is_global() { + return true; + } + self.contains(shard) + } + pub fn overlaps_shard_group(&self, other: &ShardGroup) -> bool { self.start <= other.end_inclusive && self.end_inclusive >= other.start } diff --git a/dan_layer/common_types/src/substate_type.rs b/dan_layer/common_types/src/substate_type.rs index 56e3f71f2b..7687848ced 100644 --- a/dan_layer/common_types/src/substate_type.rs +++ b/dan_layer/common_types/src/substate_type.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use serde::{Deserialize, Serialize}; -use tari_engine_types::substate::SubstateValue; +use tari_engine_types::substate::{Substate, SubstateValue}; #[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[cfg_attr( @@ -56,6 +56,12 @@ impl From<&SubstateValue> for SubstateType { } } +impl From<&Substate> for SubstateType { + fn from(value: &Substate) -> Self { + value.substate_value().into() + } +} + impl Display for SubstateType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.as_prefix_str()) diff --git a/dan_layer/common_types/src/versioned_substate_id.rs b/dan_layer/common_types/src/versioned_substate_id.rs index 39d0a1a975..dc03078aea 100644 --- a/dan_layer/common_types/src/versioned_substate_id.rs +++ b/dan_layer/common_types/src/versioned_substate_id.rs @@ -255,6 +255,12 @@ impl PartialEq for SubstateRequirementRef<'_> { impl Eq for SubstateRequirementRef<'_> {} +impl AsRef for SubstateRequirementRef<'_> { + fn as_ref(&self) -> &SubstateId { + self.substate_id + } +} + // Only consider the substate id in maps. This means that duplicates found if the substate id is the same regardless of // the version. impl std::hash::Hash for SubstateRequirementRef<'_> { diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index c448d72782..afd0afb6a0 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -358,9 +358,9 @@ impl ProposedBlockChangeSet { let _timer = TraceTimer::debug(LOG_TARGET, "ProposedBlockChangeSet::save"); // Store the block diff - BlockDiff::insert_record(tx, &self.block.block_id, &self.substate_changes)?; + BlockDiff::insert(tx, &self.block.block_id, &self.substate_changes)?; - // Store the tree diffs for each effected shard + // Store the tree diffs for each affected shard for (shard, diff) in &self.state_tree_diffs { PendingShardStateTreeDiff::create(tx, *self.block.block_id(), *shard, diff)?; } diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index 793ea49390..4aa1239c93 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -14,7 +14,6 @@ use tari_crypto::tari_utilities::ByteArray; use tari_dan_common_types::{ committee::{Committee, CommitteeInfo}, derive_fee_pool_address, - optional::Optional, shard::Shard, substate_type::SubstateType, Epoch, @@ -22,7 +21,6 @@ use tari_dan_common_types::{ NodeHeight, NumPreshards, ShardGroup, - VersionedSubstateId, }; use tari_dan_storage::{ consensus_models::{ @@ -42,11 +40,7 @@ use tari_dan_storage::{ StateStoreWriteTransaction, StorageError, }; -use tari_engine_types::{ - substate::{Substate, SubstateDiff, SubstateId}, - template_models::Amount, - vn_fee_pool::ValidatorFeePool, -}; +use tari_engine_types::{substate::SubstateDiff, template_models::Amount, ValidatorFeePool}; use tari_state_tree::{JellyfishMerkleTree, StateTreeError}; use crate::{ @@ -54,7 +48,7 @@ use crate::{ substate_store::{PendingSubstateStore, ShardScopedTreeStoreReader, ShardedStateTree}, HotStuffError, }, - traits::{LeaderStrategy, WriteableSubstateStore}, + traits::LeaderStrategy, }; const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::common"; @@ -328,6 +322,13 @@ pub(crate) fn filter_diff_for_committee(committee_info: &CommitteeInfo, diff: &S diff.down_iter() .filter(|(id, _)| committee_info.includes_substate_id(id)) .cloned(), + ) + .set_once_fee_withdrawals( + diff.validator_fee_withdrawals() + .iter() + .filter(|f| committee_info.includes_substate_id(&f.address.into())) + .cloned() + .collect(), ); filtered_diff } @@ -374,46 +375,38 @@ pub fn apply_leader_fee_to_substate_store( num_preshards: NumPreshards, total_leader_fee: Amount, ) -> Result<(), HotStuffError> { + // Basic defensive checks + assert!( + total_leader_fee.is_positive(), + "apply_leader_fee_to_substate_store: total_leader_fee ({total_leader_fee}) must be positive" + ); + if total_leader_fee.is_zero() { + // Nothing to do + return Ok(()); + } + let fee_substate_id = derive_fee_pool_address(claim_public_key_bytes, num_preshards, shard); + store.update_in_place( + &fee_substate_id.into(), + |value_mut| { + debug!(target: LOG_TARGET, "🪙 Deposit leader fee {total_leader_fee} into fee pool {fee_substate_id}"); + let substate_type = SubstateType::from(&*value_mut); + let pool = value_mut.as_validator_fee_pool_mut().ok_or_else(|| { + HotStuffError::InvariantError(format!( + "Unexpected substate type {} was found at address {}", + substate_type, fee_substate_id + )) + })?; + if !pool.deposit_direct(total_leader_fee) { + return Err(HotStuffError::InvariantError(format!( + "Failed to deposit leader fee {total_leader_fee} into fee pool {fee_substate_id}" + ))); + } - let substate_id = SubstateId::from(fee_substate_id); - let (next_amount, next_version) = if let Some(latest) = store.get_latest_change(&substate_id).optional()? { - match latest { - SubstateChange::Up { id, substate, .. } => { - debug!(target: LOG_TARGET, "DOWN current fee pool {id}"); - let version = id.version(); - store.put(SubstateChange::Down { - id, - shard, - transaction_id: Default::default(), - })?; - let pool = substate.substate_value().as_validator_fee_pool().ok_or_else(|| { - HotStuffError::InvariantError(format!( - "Unexpected substate type {} was found at address {}", - SubstateType::from(substate.substate_value()), - substate_id - )) - })?; - (pool.amount + total_leader_fee, version + 1) - }, - // If the latest fee pool is a Down, it was previously claimed - SubstateChange::Down { id, .. } => (total_leader_fee, id.version() + 1), - } - } else { - (Amount::zero(), 0) - }; - - let id = VersionedSubstateId::new(fee_substate_id, next_version); - debug!(target: LOG_TARGET, "UP new fee pool {id}"); - store.put(SubstateChange::Up { - id, - shard, - transaction_id: Default::default(), - substate: Substate::new( - next_version, - ValidatorFeePool::new(claim_public_key_bytes.into(), next_amount), - ), - })?; + Ok(()) + }, + |_| Ok(ValidatorFeePool::new(claim_public_key_bytes.into(), total_leader_fee).into()), + )?; Ok(()) } diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 690f48a954..718506690f 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -297,38 +297,7 @@ where TConsensusSpec: ConsensusSpec lock_conflicts, ), // Leader thinks all local nodes have prepared - TransactionPoolStage::Prepared => { - if tx_rec.current_decision().is_abort() { - let atom = tx_rec.get_current_transaction_atom(); - return Ok(Some(Command::LocalAccept(atom))); - } - if tx_rec - .evidence() - .is_committee_output_only(local_committee_info.shard_group()) - { - if !tx_rec.has_all_required_foreign_input_pledges(tx, local_committee_info)? { - error!( - target: LOG_TARGET, - "BUG: attempted to propose transaction {} as Prepared but not all foreign input pledges were found. \ - This transaction should not have been marked as ready. {}", - tx_rec.transaction_id(), - tx_rec.evidence() - ); - return Ok(None); - } - let atom = tx_rec.get_local_transaction_atom(); - debug!( - target: LOG_TARGET, - "ℹ️ Transaction {} is output-only for {}, proposing LocalAccept", - tx_rec.transaction_id(), - local_committee_info.shard_group() - ); - Ok(Some(Command::LocalAccept(atom))) - } else { - let atom = tx_rec.get_local_transaction_atom(); - Ok(Some(Command::LocalPrepare(atom))) - } - }, + TransactionPoolStage::Prepared => self.local_prepare_transaction(tx, local_committee_info, &tx_rec), // Leader thinks all foreign PREPARE pledges have been received (condition for LocalPrepared stage to be // ready) TransactionPoolStage::LocalPrepared => self.all_or_some_prepare_transaction( @@ -447,7 +416,7 @@ where TConsensusSpec: ConsensusSpec dont_propose_transactions: bool, base_layer_block_height: u64, base_layer_block_hash: FixedHash, - propose_epoch_end: bool, + can_propose_epoch_end: bool, ) -> Result { // The parent block will only ever not exist if it is a dummy block let parent_exists = Block::record_exists(tx, parent_block.block_id())?; @@ -462,7 +431,7 @@ where TConsensusSpec: ConsensusSpec let mut total_leader_fee = 0; - let batch = if propose_epoch_end { + let batch = if can_propose_epoch_end { ProposalBatch::default() } else { self.fetch_next_proposal_batch( @@ -473,10 +442,22 @@ where TConsensusSpec: ConsensusSpec )? }; - debug!(target: LOG_TARGET, "🌿 PROPOSE: {batch}"); + let mut substate_store = PendingSubstateStore::new( + tx, + *start_of_chain_block.block_id(), + self.config.consensus_constants.num_preshards, + ); - let mut commands = if propose_epoch_end { + debug!(target: LOG_TARGET, "🌿 PROPOSE: {batch}"); + let mut executed_transactions = HashMap::new(); + let mut commands = if can_propose_epoch_end { BTreeSet::from_iter([Command::EndEpoch]) + // self.fetch_end_of_epoch_commands( + // &start_of_chain_block, + // epoch, + // &mut executed_transactions, + // &mut substate_store, + // )? } else { BTreeSet::from_iter( batch @@ -545,12 +526,6 @@ where TConsensusSpec: ConsensusSpec } // batch is empty for is_empty, is_epoch_end and is_epoch_start blocks - let mut substate_store = PendingSubstateStore::new( - tx, - *start_of_chain_block.block_id(), - self.config.consensus_constants.num_preshards, - ); - let mut executed_transactions = HashMap::new(); let timer = TraceTimer::info(LOG_TARGET, "Generating commands").with_iterations(batch.transactions.len()); let mut lock_conflicts = TransactionLockConflicts::new(); for mut transaction in batch.transactions { @@ -759,9 +734,99 @@ where TConsensusSpec: ConsensusSpec burnt_utxos, transactions, evict_nodes, + commands: vec![], }) } + // fn fetch_end_of_epoch_commands( + // &self, + // leaf_block: &LeafBlock, + // epoch: Epoch, + // transaction_executions: &mut HashMap, + // substate_store: &mut PendingSubstateStore, + // ) -> Result { + // let _timer = TraceTimer::debug(LOG_TARGET, "fetch_end_of_epoch_commands"); + // let mut batch = ProposalBatch::default(); + // + // // First, load up the required fee mints + // let fee_mints = ValidatorFeeMint::get_all_for_epoch(substate_store.read_transaction(), epoch)?; + // + // if fee_mints.is_empty() { + // debug!(target: LOG_TARGET, "No FeeMints for epoch {epoch}"); + // batch.commands = vec![Command::EndEpoch(EndEpochAtom::empty())]; + // return Ok(batch); + // } + // + // let mut atom = EndEpochAtom::with_capacity(fee_mints.len()); + // // Then, check for conflicting transactions in LocalAccept phase or greater. + // let mut conflicting_must_finalize = vec![]; + // let mut conflicting_must_abort = vec![]; + // let mut fees_to_mint = vec![]; + // // TODO(perf): worst case O(4n) + // for mint in fee_mints { + // // TODO: check pledges + // match substate_store.get_latest_lock_by_id(&mint.substate_id)? { + // Some(lock) => { + // let mut transaction = self.transaction_pool.get( + // substate_store.read_transaction(), + // leaf_block, + // lock.transaction_id(), + // )?; + // if transaction.current_stage() >= TransactionPoolStage::LocalAccepted { + // conflicting_must_finalize.push(transaction); + // } else { + // let version = lock.version() + 1; + // let mut tx_rec = transaction.get_transaction(substate_store.read_transaction())?; + // tx_rec.abort(RejectReason::ValidatorFeePoolConflict { + // substate_id: mint.substate_id.clone(), + // }); + // transaction.set_local_decision(tx_rec.current_decision()); + // transaction_executions.insert(*tx_rec.id(), tx_rec.into_execution().expect("Aborted above")); + // conflicting_must_abort.push(transaction); + // atom.fee_mints.insert_sorted(mint.substate_id.clone(), FeeMint { + // version, + // amount: mint.amount, + // }); + // fees_to_mint.push(( + // mint.substate_id, + // Substate::new(version, ValidatorFeePool::new(mint.claim_key_bytes.into(), mint.amount)), + // )); + // } + // }, + // None => { + // let version = substate_store.get_latest_version(&mint.substate_id).optional()?; + // let version = version.map(|v| v.version() + 1).unwrap_or(0); + // fees_to_mint.push(( + // mint.substate_id.clone(), + // Substate::new(version, ValidatorFeePool::new(mint.claim_key_bytes.into(), mint.amount)), + // )); + // atom.fee_mints.insert_sorted(mint.substate_id, FeeMint { + // version, + // amount: mint.amount, + // }); + // }, + // } + // } + // + // // If any need to be finalized before epoch end, move to finalize them without proposing epoch end + // if !conflicting_must_finalize.is_empty() { + // info!(target: LOG_TARGET, "❗️ {} transaction(s) must finalize before epoch end"); + // batch.transactions = conflicting_must_finalize; + // return Ok(batch); + // } + // + // if !conflicting_must_abort.is_empty() { + // warn!(target: LOG_TARGET, "❗️ {} transaction(s) must ABORT before epoch end"); + // batch.transactions = conflicting_must_abort; + // } + // + // // Include end epoch command with mints + // batch.commands.push(Command::EndEpoch(atom)); + // + // // Mint the validator fees + // Ok(batch) + // } + #[allow(clippy::too_many_lines)] fn prepare_transaction( &self, @@ -941,6 +1006,44 @@ where TConsensusSpec: ConsensusSpec Ok(Some(command)) } + fn local_prepare_transaction( + &self, + tx: &::ReadTransaction<'_>, + local_committee_info: &CommitteeInfo, + tx_rec: &TransactionPoolRecord, + ) -> Result, HotStuffError> { + if tx_rec.current_decision().is_abort() { + let atom = tx_rec.get_current_transaction_atom(); + return Ok(Some(Command::LocalAccept(atom))); + } + if tx_rec + .evidence() + .is_committee_output_only(local_committee_info.shard_group()) + { + if !tx_rec.has_all_required_foreign_input_pledges(tx, local_committee_info)? { + error!( + target: LOG_TARGET, + "BUG: attempted to propose transaction {} as Prepared but not all foreign input pledges were found. \ + This transaction should not have been marked as ready. {}", + tx_rec.transaction_id(), + tx_rec.evidence() + ); + return Ok(None); + } + let atom = tx_rec.get_local_transaction_atom(); + debug!( + target: LOG_TARGET, + "ℹ️ Transaction {} is output-only for {}, proposing LocalAccept", + tx_rec.transaction_id(), + local_committee_info.shard_group() + ); + Ok(Some(Command::LocalAccept(atom))) + } else { + let atom = tx_rec.get_local_transaction_atom(); + Ok(Some(Command::LocalPrepare(atom))) + } + } + fn all_or_some_prepare_transaction( &self, tx: &::ReadTransaction<'_>, @@ -1035,10 +1138,18 @@ where TConsensusSpec: ConsensusSpec tx_rec.transaction_id(), )) })?; - substate_store.put_diff( - *tx_rec.transaction_id(), - &filter_diff_for_committee(local_committee_info, diff), - )?; + let filtered_diff = filter_diff_for_committee(local_committee_info, diff); + if let Err(err) = substate_store.put_diff(*tx_rec.transaction_id(), &filtered_diff) { + error!( + target: LOG_TARGET, + "🔒 Failed to write to temporary state store for transaction {} for Accept: {}. Skipping proposing this transaction...", + tx_rec.transaction_id(), + err, + ); + // Only error if it is not related to lock errors + let _err = err.ok_lock_failed()?; + return Ok(None); + } let atom = self.get_transaction_atom_with_leader_fee(tx_rec)?; Ok(Some(Command::AllAccept(atom))) } @@ -1098,13 +1209,16 @@ where TConsensusSpec: ConsensusSpec } } -pub fn get_non_local_shards(diff: &[SubstateChange], local_committee_info: &CommitteeInfo) -> HashSet { - diff.iter() +pub fn get_non_local_shards<'a, I: IntoIterator>( + diff: I, + local_committee_info: &CommitteeInfo, +) -> HashSet { + diff.into_iter() .map(|ch| { ch.versioned_substate_id() .to_shard(local_committee_info.num_preshards()) }) - .filter(|shard| local_committee_info.shard_group().contains(shard)) + .filter(|shard| !local_committee_info.shard_group().contains(shard)) .collect() } @@ -1114,17 +1228,19 @@ struct ProposalBatch { pub burnt_utxos: Vec, pub transactions: Vec, pub evict_nodes: Vec, + pub commands: Vec, } impl Display for ProposalBatch { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{} transaction(s), {} foreign proposal(s), {} UTXOs, {} evict", + "{} transaction(s), {} foreign proposal(s), {} UTXOs, {} evict, {} command(s)", self.transactions.len(), self.foreign_proposals.len(), self.burnt_utxos.len(), - self.evict_nodes.len() + self.evict_nodes.len(), + self.commands.len() ) } } diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index ca7da5df3e..b44bfc1fcb 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -9,7 +9,6 @@ use tari_dan_common_types::{ committee::CommitteeInfo, displayable::Displayable, optional::Optional, - shard::Shard, Epoch, ShardGroup, VersionedSubstateId, @@ -568,8 +567,8 @@ where TConsensusSpec: ConsensusSpec substate_store .diff() .iter() - // Calculate for local shards only AND global shard - .filter(|ch| block.shard_group().contains(&ch.shard()) || ch.shard() == Shard::global()), + // Calculate for local shards only or the global shard + .filter(|ch| block.shard_group().contains_or_global(&ch.shard())), )?; if expected_merkle_root != *block.state_merkle_root() { warn!( diff --git a/dan_layer/consensus/src/hotstuff/substate_store/error.rs b/dan_layer/consensus/src/hotstuff/substate_store/error.rs index a1d43fcf9d..b89deb74ab 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/error.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/error.rs @@ -19,6 +19,8 @@ pub enum SubstateStoreError { StoreError(#[from] StorageError), #[error(transparent)] StateTreeError(#[from] tari_state_tree::StateTreeError), + #[error("Invariant error: {details}")] + InvariantError { details: String }, } impl IsNotFoundError for SubstateStoreError { diff --git a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs index 4a158098ec..fac7a6a71c 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs @@ -8,6 +8,7 @@ use log::*; use tari_dan_common_types::{ displayable::Displayable, optional::Optional, + substate_type::SubstateType, LockIntent, NumPreshards, SubstateAddress, @@ -22,7 +23,7 @@ use tari_dan_storage::{ StateStore, StateStoreReadTransaction, }; -use tari_engine_types::substate::{Substate, SubstateDiff, SubstateId}; +use tari_engine_types::substate::{Substate, SubstateDiff, SubstateId, SubstateValue}; use tari_transaction::TransactionId; use super::error::SubstateStoreError; @@ -62,6 +63,113 @@ impl<'a, 'tx, TStore: StateStore + 'a> PendingSubstateStore<'a, 'tx, TStore> { pub fn read_transaction(&self) -> &'a TStore::ReadTransaction<'tx> { self.store } + + fn get_latest_change_from_store(&self, id: &SubstateId) -> Result { + if let Some(change) = BlockDiff::get_for_substate(self.read_transaction(), &self.parent_block, id).optional()? { + return Ok(change); + } + + let substate = SubstateRecord::get_latest(self.read_transaction(), id) + .optional()? + .ok_or_else(|| SubstateStoreError::SubstateNotFound { + id: VersionedSubstateId::new(id.clone(), 0), + })?; + if let Some(destroyed) = substate.destroyed() { + return Ok(SubstateChange::Down { + id: VersionedSubstateId::new(id.clone(), substate.version()), + shard: destroyed.by_shard, + transaction_id: destroyed.by_transaction, + }); + } + Ok(SubstateChange::Up { + id: VersionedSubstateId::new(id.clone(), substate.version()), + shard: substate.created_by_shard, + transaction_id: substate.created_by_transaction, + substate: substate + .into_substate() + .expect("PendingSubstateStore::get_latest_change: UP substate has no value"), + }) + } + + pub(crate) fn update_in_place( + &mut self, + substate_id: &SubstateId, + updater: FUpdate, + creator: FCreate, + ) -> Result<(), TErr> + where + TErr: From, + FUpdate: FnOnce(&mut SubstateValue) -> Result<(), TErr>, + FCreate: FnOnce(Option>) -> Result, + { + let num_preshards = self.num_preshards; + if let Some(head_change_mut) = self.get_head_change_mut(substate_id) { + match head_change_mut { + SubstateChange::Up { substate, .. } => { + return updater(substate.substate_value_mut()); + }, + SubstateChange::Down { id, .. } => { + let value = creator(Some(id.as_ref()))?; + let next_id = id.to_next_version(); + let up = SubstateChange::Up { + shard: id.to_shard(num_preshards), + // TODO: determine if we can remove this field + transaction_id: Default::default(), + substate: Substate::new(next_id.version(), value), + id: next_id, + }; + self.put(up)?; + }, + } + return Ok(()); + } + + let Some(change) = self.get_latest_change_from_store(substate_id).optional()? else { + let value = creator(None)?; + let id = VersionedSubstateId::new(substate_id.clone(), 0); + let up = SubstateChange::Up { + shard: id.to_shard(num_preshards), + transaction_id: Default::default(), + substate: Substate::new(id.version(), value), + id, + }; + self.put(up)?; + return Ok(()); + }; + match change { + SubstateChange::Up { + mut substate, + id, + shard, + .. + } => { + updater(substate.substate_value_mut())?; + self.put(SubstateChange::Down { + id: id.clone(), + shard, + transaction_id: Default::default(), + })?; + self.put(SubstateChange::Up { + id: id.to_next_version(), + shard, + transaction_id: Default::default(), + substate, + })?; + }, + SubstateChange::Down { id, .. } => { + let value = creator(Some(id.as_ref()))?; + let next_id = id.to_next_version(); + let up = SubstateChange::Up { + shard: id.to_shard(self.num_preshards), + transaction_id: Default::default(), + substate: Substate::new(next_id.version(), value), + id: next_id, + }; + self.put(up)?; + }, + } + Ok(()) + } } impl<'store, 'tx, TStore: StateStore + 'store + 'tx> ReadableSubstateStore @@ -72,9 +180,12 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> ReadableSubstateStore fn get(&self, id: VersionedSubstateIdRef<'_>) -> Result { let substate_addr = id.to_substate_address(); if let Some(change) = self.get_pending(&substate_addr) { - return change.up().cloned().ok_or_else(|| SubstateStoreError::SubstateIsDown { - id: change.versioned_substate_id().clone(), - }); + return change + .up_substate() + .cloned() + .ok_or_else(|| SubstateStoreError::SubstateIsDown { + id: change.versioned_substate_id().clone(), + }); } if let Some(change) = @@ -117,6 +228,10 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> WriteableSubstateStore for PendingS fn put_diff(&mut self, transaction_id: TransactionId, diff: &SubstateDiff) -> Result<(), Self::Error> { for (id, version) in diff.down_iter() { + // Handled by fee withdrawals below + if id.is_validator_fee_pool() { + continue; + } let id = VersionedSubstateId::new(id.clone(), *version); let shard = id.to_shard(self.num_preshards); debug!(target: LOG_TARGET, "🔽️ Down: {id} {shard}"); @@ -128,6 +243,10 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> WriteableSubstateStore for PendingS } for (id, substate) in diff.up_iter() { + // Handled by fee withdrawals below + if id.is_validator_fee_pool() { + continue; + } let id = VersionedSubstateId::new(id.clone(), substate.version()); let shard = id.to_shard(self.num_preshards); debug!(target: LOG_TARGET, "🔼️ Up: {id} {shard} value hash: {}", substate.to_value_hash()); @@ -139,6 +258,42 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> WriteableSubstateStore for PendingS })?; } + for withdraw in diff.validator_fee_withdrawals() { + let id = withdraw.address.into(); + self.update_in_place( + &id, + |value_mut| { + let substate_type = SubstateType::from(&*value_mut); + let fee_pool = + value_mut + .as_validator_fee_pool_mut() + .ok_or_else(|| SubstateStoreError::InvariantError { + details: format!( + "Expected substate {id} to be a ValidatorFeePool but was {substate_type}", + ), + })?; + if !fee_pool.withdraw_direct(withdraw.amount) { + return Err(SubstateStoreError::InvariantError { + details: format!( + "Insufficient balance to withdraw {} from validator fee pool {} (balance: {})", + withdraw.amount, + withdraw.address, + fee_pool.amount() + ), + }); + } + Ok(()) + }, + // If the substate is down, we cannot withdraw from it + |maybe_id| match maybe_id { + Some(id) => Err(SubstateStoreError::SubstateIsDown { id: id.to_owned() }), + None => Err(SubstateStoreError::SubstateNotFound { + id: VersionedSubstateId::new(id.clone(), 0), + }), + }, + )?; + } + Ok(()) } } @@ -185,31 +340,20 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> PendingSubstateStore<'store Ok(substates) } + fn get_head_change(&self, id: &SubstateId) -> Option<&SubstateChange> { + self.head.get(id).map(|&pos| &self.diff[pos]) + } + + fn get_head_change_mut(&mut self, id: &SubstateId) -> Option<&mut SubstateChange> { + self.head.get(id).map(|&pos| &mut self.diff[pos]) + } + pub fn get_latest_change(&self, id: &SubstateId) -> Result { - if let Some(ch) = self.head.get(id).map(|&pos| &self.diff[pos]) { + if let Some(ch) = self.get_head_change(id) { return Ok(ch.clone()); } - if let Some(change) = BlockDiff::get_for_substate(self.read_transaction(), &self.parent_block, id).optional()? { - return Ok(change); - } - - let substate = SubstateRecord::get_latest(self.read_transaction(), id)?; - if let Some(destroyed) = substate.destroyed() { - return Ok(SubstateChange::Down { - id: VersionedSubstateId::new(id.clone(), substate.version()), - shard: destroyed.by_shard, - transaction_id: destroyed.by_transaction, - }); - } - Ok(SubstateChange::Up { - id: VersionedSubstateId::new(id.clone(), substate.version()), - shard: substate.created_by_shard, - transaction_id: substate.created_by_transaction, - substate: substate - .into_substate() - .expect("PendingSubstateStore::get_latest_change: UP substate has no value"), - }) + self.get_latest_change_from_store(id) } pub fn has_any_conflicting_pledges<'a, I>( @@ -548,7 +692,7 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> PendingSubstateStore<'store self.diff.push(change) } - fn get_latest_lock_by_id(&self, id: &SubstateId) -> Result>, SubstateStoreError> { + pub fn get_latest_lock_by_id(&self, id: &SubstateId) -> Result>, SubstateStoreError> { if let Some(lock) = self.new_locks.get(id).and_then(|locks| locks.last()) { return Ok(Some(Cow::Borrowed(lock))); } diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs index 761aaea05d..9d25a186e7 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs @@ -12,7 +12,6 @@ use tari_dan_common_types::{ LockIntent, SubstateRequirement, SubstateRequirementRef, - ToSubstateAddress, VersionedSubstateId, }; use tari_dan_storage::{ @@ -197,12 +196,7 @@ impl> if !err.is_not_found_error() && !err.is_substate_down_error() { return Err(err); } - let is_local_only = local_committee_info.includes_all_substate_addresses( - transaction - .transaction - .all_inputs_iter() - .map(|i| i.or_zero_version().to_substate_address()), - ); + let is_local_only = local_committee_info.is_all_local(transaction.transaction.all_inputs_iter()); // Currently this message will differ depending on which involved shard is asked. // e.g. local nodes will say "failed to lock inputs", foreign nodes will say "foreign shard abort" transaction.abort(RejectReason::OneOrMoreInputsNotFound(err.to_string())); @@ -258,8 +252,7 @@ impl> .unwrap_or_else(|| self.execute_or_fetch(store, transaction, current_epoch, &local_inputs, block_id))?; // local-only transaction can be determined if we've executed the transaction - let is_local_only = local_committee_info - .includes_all_substate_addresses(execution.resulting_outputs().iter().map(|o| o.to_substate_address())); + let is_local_only = local_committee_info.is_all_local(execution.resulting_outputs()); if is_local_only { info!( target: LOG_TARGET, @@ -279,6 +272,7 @@ impl> warn!(target: LOG_TARGET, "⚠️ PREPARE: Hard conflict when locking inputs: {err}"); execution.set_abort_reason(RejectReason::FailedToLockInputs(err.to_string())); } + Ok(PreparedTransaction::new_local_accept(execution, lock_status)) } else { info!(target: LOG_TARGET, "👨‍🔧 PREPARE: transaction {} has local inputs and foreign outputs (Local decision: {})", execution.id(), execution.decision()); diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index d09b76afeb..cea0b14c88 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -12,10 +12,14 @@ use std::time::Duration; use log::info; use tari_common_types::types::PrivateKey; -use tari_consensus::{hotstuff::HotStuffError, messages::HotstuffMessage}; +use tari_consensus::{ + hotstuff::{to_public_key_bytes, HotStuffError}, + messages::HotstuffMessage, +}; use tari_crypto::tari_utilities::ByteArray; use tari_dan_common_types::{ - crypto::create_key_pair, + crypto::{create_key_pair, create_key_pair_from_seed}, + derive_fee_pool_address, optional::Optional, Epoch, NodeHeight, @@ -34,6 +38,7 @@ use tari_engine_types::{ hashing::hash_template_code, published_template::PublishedTemplateAddress, substate::SubstateId, + ValidatorFeeWithdrawal, }; use tari_transaction::Transaction; @@ -911,6 +916,7 @@ async fn single_shard_input_conflict() { fee: 1, input_locks: vec![(substate_id.substate_id().clone(), SubstateLockType::Write)], new_outputs: vec![], + validator_fee_withdrawals: vec![], }) .add_execution_at_destination(TestVnDestination::All, ExecuteSpec { transaction: tx2.transaction().clone(), @@ -918,6 +924,7 @@ async fn single_shard_input_conflict() { fee: 1, input_locks: vec![(substate_id.substate_id().clone(), SubstateLockType::Write)], new_outputs: vec![], + validator_fee_withdrawals: vec![], }); test.network() @@ -1179,6 +1186,7 @@ async fn single_shard_unversioned_inputs() { .map(|input| (input.into_substate_id(), SubstateLockType::Write)) .collect(), new_outputs: vec![], + validator_fee_withdrawals: vec![], }); test.start_epoch(Epoch(1)).await; @@ -1262,6 +1270,7 @@ async fn multishard_unversioned_input_conflict() { (id1.substate_id().clone(), SubstateLockType::Write), ], new_outputs: vec![], + validator_fee_withdrawals: vec![], }) .add_execution_at_destination(TestVnDestination::All, ExecuteSpec { transaction: tx2.transaction().clone(), @@ -1272,6 +1281,7 @@ async fn multishard_unversioned_input_conflict() { (id1.substate_id().clone(), SubstateLockType::Write), ], new_outputs: vec![], + validator_fee_withdrawals: vec![], }); // NOTE: we send tx1 to committee 0 and tx2 to committee 1 to loosely ensure that we create the situation this test @@ -1361,6 +1371,7 @@ async fn multishard_unversioned_input_conflict_delay_prepare() { (id1.substate_id().clone(), SubstateLockType::Write), ], new_outputs: vec![], + validator_fee_withdrawals: vec![], }) .add_execution_at_destination(TestVnDestination::All, ExecuteSpec { transaction: tx2.transaction().clone(), @@ -1371,6 +1382,7 @@ async fn multishard_unversioned_input_conflict_delay_prepare() { (id2.substate_id().clone(), SubstateLockType::Write), ], new_outputs: vec![], + validator_fee_withdrawals: vec![], }); test.network() @@ -1545,6 +1557,7 @@ async fn multishard_publish_template() { .map(|input| (input.into_substate_id(), SubstateLockType::Write)) .collect(), new_outputs: vec![SubstateId::Template(template_id)], + validator_fee_withdrawals: vec![], }); test.start_epoch(Epoch(1)).await; @@ -1564,7 +1577,7 @@ async fn multishard_publish_template() { test.assert_all_validators_at_same_height().await; test.assert_all_validators_committed(tx.id()); - // Assert all LocalOnly + // Assert all have the template let template_substate = test .get_validator(&TestAddress::new("1")) .state_store @@ -1581,6 +1594,95 @@ async fn multishard_publish_template() { test.assert_clean_shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multishard_validator_fee_claim() { + setup_logger(); + let (claim_sk, claim_pk) = create_key_pair_from_seed(100); + let claim_bytes = to_public_key_bytes(&claim_pk); + let mut test = Test::builder() + .add_committee(0, vec!["1", "2"]) + .add_committee(1, vec!["3", "4"]) + .set_claim_key(TestVnDestination::All, claim_pk) + .start() + .await; + // Create and send publish template transaction + let inputs = test.create_substates_on_vns(TestVnDestination::All, 1); + let address = derive_fee_pool_address( + claim_bytes, + test.num_preshards(), + test.num_preshards() + .all_shard_groups_iter(test.num_committees()) + .next() + .unwrap() + .start(), + ); + let claim_tx = Transaction::builder() + .claim_validator_fees(address) + .with_inputs(inputs.iter().cloned().map(Into::into)) + .add_input(address) + .build_and_seal(&claim_sk); + let claim_tx = TransactionRecord::new(claim_tx); + + test.add_execution_at_destination(TestVnDestination::All, ExecuteSpec { + transaction: claim_tx.transaction().clone(), + decision: Decision::Commit, + fee: 1000, + input_locks: inputs + .into_iter() + .map(|input| (input.into_substate_id(), SubstateLockType::Write)) + .collect(), + new_outputs: vec![], + validator_fee_withdrawals: vec![ValidatorFeeWithdrawal { + address, + amount: 500.into(), + }], + }); + + // Get some fees + test.send_transaction_to_all(Decision::Commit, 1000, 1, 1).await; + test.send_transaction_to_all(Decision::Commit, 1000, 1, 1).await; + + test.start_epoch(Epoch(1)).await; + let mut tx_sent = false; + + loop { + test.on_block_committed().await; + + let leaf = test.get_validator(&TestAddress::new("1")).get_leaf_block(); + + let is_pool_empty = test.is_transaction_pool_empty(); + let mut sent_now = false; + if !tx_sent && (is_pool_empty || leaf.height >= NodeHeight(15)) { + // Send a claim + test.send_transaction_to_destination(TestVnDestination::All, claim_tx.clone()) + .await; + tx_sent = true; + sent_now = true; + } + + // Prevent race condition between sending the transaction and it entering the pool + if !sent_now && is_pool_empty { + break; + } + + if leaf.height >= NodeHeight(30) { + panic!("Not all transaction committed after {} blocks", leaf.height); + } + } + + test.assert_all_validators_at_same_height().await; + test.assert_all_validators_committed(claim_tx.id()); + + // Assert fee pool exists + let _fee_pool = test + .get_validator(&TestAddress::new("1")) + .state_store + .with_read_tx(|tx| SubstateRecord::get_latest(tx, &address.into())) + .unwrap(); + + test.assert_clean_shutdown().await; +} + // mod dump_data { // use super::*; // use std::fs::File; diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index fe26d9f471..6ecc7400a7 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -17,7 +17,12 @@ use tari_dan_storage::{global::models::ValidatorNode, StorageError}; use tari_epoch_manager::{EpochManagerError, EpochManagerEvent, EpochManagerReader}; use tokio::sync::{broadcast, Mutex, MutexGuard}; -use crate::support::{address::TestAddress, helpers::random_substate_in_shard_group, TEST_NUM_PRESHARDS}; +use crate::support::{ + address::TestAddress, + helpers::random_substate_in_shard_group, + TestVnDestination, + TEST_NUM_PRESHARDS, +}; #[derive(Debug, Clone)] pub struct TestEpochManager { @@ -57,7 +62,13 @@ impl TestEpochManager { self.inner.lock().await } - pub fn clone_for(&self, address: TestAddress, public_key: PublicKey, shard_key: SubstateAddress) -> Self { + pub fn clone_for( + &self, + address: TestAddress, + public_key: PublicKey, + shard_key: SubstateAddress, + fee_claim_pk: PublicKey, + ) -> Self { let mut copy = self.clone(); if let Some(our_validator_node) = self.our_validator_node.clone() { copy.our_validator_node = Some(ValidatorNode { @@ -66,7 +77,7 @@ impl TestEpochManager { shard_key, start_epoch: our_validator_node.start_epoch, end_epoch: None, - fee_claim_public_key: public_key, + fee_claim_public_key: fee_claim_pk, }); } else { copy.our_validator_node = Some(ValidatorNode { @@ -75,13 +86,13 @@ impl TestEpochManager { shard_key, start_epoch: Epoch(0), end_epoch: None, - fee_claim_public_key: public_key, + fee_claim_public_key: fee_claim_pk, }); } copy } - pub async fn add_committees(&self, committees: HashMap>) { + pub async fn add_committees(&self, committees: HashMap>) -> &Self { let mut state = self.state_lock().await; for (shard_group, committee) in committees { for (address, pk) in &committee.members { @@ -106,6 +117,18 @@ impl TestEpochManager { state.committees.insert(shard_group, committee); } + self + } + + pub async fn set_claim_keys(&self, dest: TestVnDestination, claim_key: PublicKey) -> &Self { + let mut state = self.state_lock().await; + let num_committees = state.committees.len() as u32; + state.validator_nodes.iter_mut().for_each(|(address, (vn, sg))| { + if dest.is_for(address, *sg, num_committees) { + vn.fee_claim_public_key = claim_key.clone(); + } + }); + self } pub async fn all_validators(&self) -> Vec<(ValidatorNode, ShardGroup)> { diff --git a/dan_layer/consensus_tests/src/support/executions_store.rs b/dan_layer/consensus_tests/src/support/executions_store.rs index af8de5ffff..ed345b9824 100644 --- a/dan_layer/consensus_tests/src/support/executions_store.rs +++ b/dan_layer/consensus_tests/src/support/executions_store.rs @@ -8,7 +8,7 @@ use std::{ use tari_dan_common_types::SubstateLockType; use tari_dan_storage::consensus_models::Decision; -use tari_engine_types::substate::SubstateId; +use tari_engine_types::{substate::SubstateId, ValidatorFeeWithdrawal}; use tari_transaction::{Transaction, TransactionId}; type TestExecutionOutputMap = HashMap; @@ -42,4 +42,5 @@ pub struct ExecuteSpec { pub fee: u64, pub input_locks: Vec<(SubstateId, SubstateLockType)>, pub new_outputs: Vec, + pub validator_fee_withdrawals: Vec, } diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index ad7d48540e..84689756ea 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -11,6 +11,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use itertools::Itertools; use log::info; use tari_common::configuration::Network; +use tari_common_types::types::PublicKey; use tari_consensus::{ consensus_constants::ConsensusConstants, hotstuff::{HotstuffConfig, HotstuffEvent}, @@ -93,6 +94,7 @@ impl Test { .map(|input| (input.substate_id().clone(), SubstateLockType::Write)) .collect(), new_outputs: new_outputs.clone(), + validator_fee_withdrawals: vec![], }); self.send_transaction_to_destination(TestVnDestination::All, transaction.clone()) @@ -127,6 +129,7 @@ impl Test { fee: transaction.transaction_fee().unwrap_or(1), input_locks, new_outputs, + validator_fee_withdrawals: vec![], }); self } @@ -215,6 +218,14 @@ impl Test { &self.validators } + pub const fn num_preshards(&self) -> NumPreshards { + TEST_NUM_PRESHARDS + } + + pub fn num_committees(&self) -> u32 { + self.num_committees + } + pub async fn on_hotstuff_event(&mut self) -> (TestAddress, HotstuffEvent) { if self.network.task_handle().is_finished() { panic!("Network task exited while waiting for Hotstuff event"); @@ -529,6 +540,7 @@ pub struct TestBuilder { message_filter: Option, failure_nodes: Vec, config: HotstuffConfig, + claim_keys: Option<(TestVnDestination, PublicKey)>, } impl TestBuilder { @@ -540,6 +552,7 @@ impl TestBuilder { debug_sql_file: None, message_filter: None, failure_nodes: Vec::new(), + claim_keys: None, config: HotstuffConfig { network: Network::LocalNet, sidechain_id: None, @@ -585,6 +598,11 @@ impl TestBuilder { self } + pub fn set_claim_key(mut self, dest: TestVnDestination, claim_key: PublicKey) -> Self { + self.claim_keys = Some((dest, claim_key)); + self + } + pub fn add_committee(mut self, committee_num: u32, addresses: Vec<&'static str>) -> Self { let entry = self .committees @@ -645,7 +663,8 @@ impl TestBuilder { .with_address_and_secret_key(vn.address.clone(), sk) .with_shard(vn.shard_key) .with_shard_group(shard_group) - .with_epoch_manager(epoch_manager.clone_for(vn.address.clone(), pk, vn.shard_key)) + .with_fee_claim_public_key(vn.fee_claim_public_key.clone()) + .with_epoch_manager(epoch_manager.clone_for(vn.address.clone(), pk, vn.shard_key, vn.fee_claim_public_key)) .with_leader_strategy(*leader_strategy) .with_num_committees(num_committees) .spawn(shutdown_signal.clone()); @@ -675,6 +694,9 @@ impl TestBuilder { let (tx_epoch_events, _) = broadcast::channel(10); let epoch_manager = TestEpochManager::new(tx_epoch_events); epoch_manager.add_committees(committees).await; + if let Some((dest, claim_key)) = self.claim_keys { + epoch_manager.set_claim_keys(dest, claim_key).await; + } let shutdown = Shutdown::new(); let (channels, validators) = Self::build_validators( &leader_strategy, diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index 5d122222ea..2a123e301c 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -14,6 +14,8 @@ use tari_engine_types::{ published_template::PublishedTemplate, substate::{Substate, SubstateDiff, SubstateId}, transaction_receipt::{TransactionReceipt, TransactionReceiptAddress}, + ValidatorFeePool, + ValidatorFeeWithdrawal, }; use tari_template_lib::args; use tari_transaction::Transaction; @@ -34,6 +36,7 @@ pub fn create_execution_result_for_transaction( fee: u64, resolved_inputs: &[VersionedSubstateIdLockIntent], resulting_outputs: &[VersionedSubstateIdLockIntent], + validator_fee_withdrawals: Vec, ) -> ExecuteResult { let result = if decision.is_commit() { let mut diff = SubstateDiff::new(); @@ -84,10 +87,20 @@ pub fn create_execution_result_for_transaction( }), ); }, + SubstateId::ValidatorFeePool(_) => { + diff.up( + output.versioned_substate_id().substate_id().clone(), + Substate::new(output.versioned_substate_id().version(), ValidatorFeePool { + // This does not matter in tests + claim_public_key: Default::default(), + amount: 100_000.into(), + }), + ); + }, _ => { panic!( - "create_execution_result_for_transaction: Test harness only supports generating component and \ - template outputs. Got {output}" + "create_execution_result_for_transaction: Test harness only supports generating component, vn \ + fee, and template outputs. Got {output}" ); }, } @@ -107,6 +120,8 @@ pub fn create_execution_result_for_transaction( }), ); + diff.set_once_fee_withdrawals(validator_fee_withdrawals); + TransactionResult::Accept(diff) } else { TransactionResult::Reject(RejectReason::ExecutionFailure( diff --git a/dan_layer/consensus_tests/src/support/transaction_executor.rs b/dan_layer/consensus_tests/src/support/transaction_executor.rs index c61c94bd06..88fc2ead2a 100644 --- a/dan_layer/consensus_tests/src/support/transaction_executor.rs +++ b/dan_layer/consensus_tests/src/support/transaction_executor.rs @@ -107,6 +107,14 @@ impl BlockTransactionExecutor for TestBloc 0, ))) .map(VersionedSubstateIdLockIntent::output) + .chain( + spec.validator_fee_withdrawals + .iter() + .filter_map(|w| { + let input = resolved_inputs.iter().find(|i| i.versioned_substate_id().substate_id().as_validator_fee_pool_address() == Some(w.address))?; + Some(VersionedSubstateIdLockIntent::output(VersionedSubstateId::new(w.address, input.version() + 1))) + }) + ) .collect::>(); let result = create_execution_result_for_transaction( @@ -115,6 +123,7 @@ impl BlockTransactionExecutor for TestBloc spec.fee, &resolved_inputs, &resulting_outputs, + spec.validator_fee_withdrawals, ); let executed = ExecutedTransaction::new(transaction, result, resolved_inputs); diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index cc3d4ca4c4..0337ae3501 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -33,6 +33,7 @@ pub struct ValidatorBuilder { pub secret_key: PrivateKey, pub public_key: PublicKey, pub shard_address: SubstateAddress, + pub fee_claim_public_key: PublicKey, pub shard_group: ShardGroup, pub sql_url: String, pub leader_strategy: RoundRobinLeaderStrategy, @@ -48,6 +49,7 @@ impl ValidatorBuilder { address: TestAddress::new("default"), secret_key: PrivateKey::default(), public_key: PublicKey::default(), + fee_claim_public_key: PublicKey::default(), shard_address: SubstateAddress::zero(), num_committees: 0, shard_group: ShardGroup::all_shards(TEST_NUM_PRESHARDS), @@ -76,6 +78,11 @@ impl ValidatorBuilder { self } + pub fn with_fee_claim_public_key(&mut self, fee_claim_public_key: PublicKey) -> &mut Self { + self.fee_claim_public_key = fee_claim_public_key; + self + } + pub fn with_shard(&mut self, shard: SubstateAddress) -> &mut Self { self.shard_address = shard; self @@ -117,6 +124,7 @@ impl ValidatorBuilder { self.address.clone(), self.public_key.clone(), self.shard_address, + self.fee_claim_public_key.clone(), ); let (outbound_messaging, rx_loopback) = diff --git a/dan_layer/engine/src/runtime/impl.rs b/dan_layer/engine/src/runtime/impl.rs index 7a5c94e3f0..ea9c54f746 100644 --- a/dan_layer/engine/src/runtime/impl.rs +++ b/dan_layer/engine/src/runtime/impl.rs @@ -43,8 +43,8 @@ use tari_engine_types::{ resource_container::ResourceContainer, substate::{SubstateId, SubstateValue}, vault::Vault, - vn_fee_pool::ValidatorFeePoolAddress, TemplateAddress, + ValidatorFeePoolAddress, }; use tari_template_abi::{TemplateDef, Type}; use tari_template_builtin::{ACCOUNT_NFT_TEMPLATE_ADDRESS, ACCOUNT_TEMPLATE_ADDRESS}; @@ -2240,10 +2240,8 @@ impl> RuntimeInte let bucket_id = state.new_bucket_id(); state.new_bucket(bucket_id, resource)?; state.set_last_instruction_output(IndexedValue::from_type(&bucket_id)?); - Ok::<_, RuntimeError>(()) - })?; - - Ok(()) + Ok(()) + }) } fn set_fee_checkpoint(&self) -> Result<(), RuntimeError> { diff --git a/dan_layer/engine/src/runtime/mod.rs b/dan_layer/engine/src/runtime/mod.rs index 515a8c81f3..7381040d7d 100644 --- a/dan_layer/engine/src/runtime/mod.rs +++ b/dan_layer/engine/src/runtime/mod.rs @@ -61,7 +61,7 @@ use tari_engine_types::{ indexed_value::IndexedValue, lock::LockFlag, substate::SubstateValue, - vn_fee_pool::ValidatorFeePoolAddress, + ValidatorFeePoolAddress, }; use tari_template_lib::{ args::{ diff --git a/dan_layer/engine/src/runtime/tracker.rs b/dan_layer/engine/src/runtime/tracker.rs index 16e9864ba8..bc0c52b5e2 100644 --- a/dan_layer/engine/src/runtime/tracker.rs +++ b/dan_layer/engine/src/runtime/tracker.rs @@ -302,7 +302,8 @@ impl StateTracker { let fee_receipt = transaction_receipt.fee_receipt.clone(); - let result = state.generate_substate_diff(transaction_receipt, substates_to_persist); + let fee_withdrawals = state.take_validator_fee_withdrawals(); + let result = state.generate_substate_diff(transaction_receipt, substates_to_persist, fee_withdrawals); let result = match result { Ok(substate_diff) => TransactionResult::Accept(substate_diff), diff --git a/dan_layer/engine/src/runtime/working_state.rs b/dan_layer/engine/src/runtime/working_state.rs index 3674b8bc6b..916ffe2441 100644 --- a/dan_layer/engine/src/runtime/working_state.rs +++ b/dan_layer/engine/src/runtime/working_state.rs @@ -27,8 +27,9 @@ use tari_engine_types::{ transaction_receipt::TransactionReceipt, vault::Vault, virtual_substate::{VirtualSubstate, VirtualSubstateId, VirtualSubstates}, - vn_fee_pool::ValidatorFeePoolAddress, TemplateAddress, + ValidatorFeePoolAddress, + ValidatorFeeWithdrawal, }; use tari_template_lib::{ args::{MintArg, ResourceDiscriminator}, @@ -81,6 +82,7 @@ pub(super) struct WorkingState { claimed_confidential_outputs: Vec, virtual_substates: VirtualSubstates, + validator_fee_withdrawals: Vec, last_instruction_output: Option, workspace: Workspace, @@ -113,6 +115,7 @@ impl WorkingState { workspace: Workspace::default(), virtual_substates, + validator_fee_withdrawals: Vec::new(), call_frames: Vec::new(), initial_call_scope, fee_state: FeeState::new(), @@ -747,26 +750,32 @@ impl WorkingState { address: ValidatorFeePoolAddress, ) -> Result { let locked_substate = self.lock_substate(&SubstateId::ValidatorFeePool(address), LockFlag::Write)?; - let fee_pool = self - .get_locked_substate(&locked_substate)? - .as_validator_fee_pool() - .ok_or_else(|| RuntimeError::InvariantError { - function: "StateTracker::claim_fee", - details: format!("Expected substate at address {address} to be an ValidatorFeePool",), - })?; + { + let fee_pool = self + .get_locked_substate(&locked_substate)? + .as_validator_fee_pool() + .ok_or_else(|| RuntimeError::InvariantError { + function: "StateTracker::withdraw_all_fees_from_pool", + details: format!("Expected substate at address {address} to be an ValidatorFeePool",), + })?; - self.authorization() - .require_ownership(NativeAction::WithdrawValidatorFunds, fee_pool.as_ownership())?; + self.authorization() + .require_ownership(NativeAction::WithdrawValidatorFunds, fee_pool.as_ownership())?; + } let pool_mut = self .get_locked_substate_mut(&locked_substate)? .as_validator_fee_pool_mut() .ok_or_else(|| RuntimeError::InvariantError { - function: "StateTracker::claim_fee", + function: "StateTracker::withdraw_all_fees_from_pool", details: format!("Expected substate at address {address} to be an ValidatorFeePool",), })?; let resource_container = pool_mut.withdraw_all()?; + self.validator_fee_withdrawals.push(ValidatorFeeWithdrawal { + address, + amount: resource_container.amount(), + }); Ok(resource_container) } @@ -826,6 +835,10 @@ impl WorkingState { self.store.mutated_substates() } + pub fn take_validator_fee_withdrawals(&mut self) -> Vec { + mem::take(&mut self.validator_fee_withdrawals) + } + pub fn fee_state(&self) -> &FeeState { &self.fee_state } @@ -1120,13 +1133,16 @@ impl WorkingState { &self, transaction_receipt: TransactionReceipt, substates_to_persist: IndexMap, + fee_withdrawals: Vec, ) -> Result { let mut substate_diff = SubstateDiff::new(); - for (address, substate) in substates_to_persist { - let new_substate = match self.store.get_unmodified_substate(&address).optional()? { + substate_diff.set_once_fee_withdrawals(fee_withdrawals); + + for (id, substate) in substates_to_persist { + let new_substate = match self.store.get_unmodified_substate(&id).optional()? { Some(existing_state) => { - substate_diff.down(address.clone(), existing_state.version()); + substate_diff.down(id.clone(), existing_state.version()); if substate.as_validator_fee_pool().is_some_and(|fee| fee.amount.is_zero()) { // If there are no fees left, do not up the fee pool continue; @@ -1135,7 +1151,7 @@ impl WorkingState { }, None => Substate::new(0, substate), }; - substate_diff.up(address, new_substate); + substate_diff.up(id, new_substate); } // Special case: unclaimed confidential outputs are downed without being upped if claimed diff --git a/dan_layer/engine_types/src/indexed_value.rs b/dan_layer/engine_types/src/indexed_value.rs index 252d4272d7..ff60e494b9 100644 --- a/dan_layer/engine_types/src/indexed_value.rs +++ b/dan_layer/engine_types/src/indexed_value.rs @@ -25,7 +25,7 @@ use crate::{ serde_with, substate::SubstateId, transaction_receipt::TransactionReceiptAddress, - vn_fee_pool::ValidatorFeePoolAddress, + ValidatorFeePoolAddress, }; const MAX_VISITOR_DEPTH: usize = 50; diff --git a/dan_layer/engine_types/src/instruction.rs b/dan_layer/engine_types/src/instruction.rs index 26c68e06d3..6cc2d977d4 100644 --- a/dan_layer/engine_types/src/instruction.rs +++ b/dan_layer/engine_types/src/instruction.rs @@ -14,7 +14,7 @@ use tari_template_lib::{ #[cfg(feature = "ts")] use ts_rs::TS; -use crate::{confidential::ConfidentialClaim, serde_with, vn_fee_pool::ValidatorFeePoolAddress}; +use crate::{confidential::ConfidentialClaim, serde_with, ValidatorFeePoolAddress}; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] #[cfg_attr(feature = "ts", derive(TS), ts(export, export_to = "../../bindings/src/types/"))] diff --git a/dan_layer/engine_types/src/lib.rs b/dan_layer/engine_types/src/lib.rs index fd4af11ab8..49be98f014 100644 --- a/dan_layer/engine_types/src/lib.rs +++ b/dan_layer/engine_types/src/lib.rs @@ -34,9 +34,9 @@ pub mod id_provider; mod argument_parser; pub mod published_template; mod substate_serde; -pub mod vn_fee_pool; - +mod validator_fee; pub use argument_parser::parse_arg; +pub use validator_fee::*; pub mod template_models { pub use tari_template_lib::models::*; diff --git a/dan_layer/engine_types/src/substate.rs b/dan_layer/engine_types/src/substate.rs index 6f18fe8979..f07363317b 100644 --- a/dan_layer/engine_types/src/substate.rs +++ b/dan_layer/engine_types/src/substate.rs @@ -54,7 +54,9 @@ use crate::{ resource::Resource, transaction_receipt::{TransactionReceipt, TransactionReceiptAddress}, vault::Vault, - vn_fee_pool::{ValidatorFeePool, ValidatorFeePoolAddress}, + ValidatorFeePool, + ValidatorFeePoolAddress, + ValidatorFeeWithdrawal, }; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -176,6 +178,13 @@ impl SubstateId { } } + pub fn as_validator_fee_pool_address(&self) -> Option { + match self { + Self::ValidatorFeePool(address) => Some(*address), + _ => None, + } + } + pub fn to_bytes(&self) -> Vec { encode(self).unwrap() } @@ -280,6 +289,10 @@ impl SubstateId { matches!(self, Self::Template(_)) } + pub fn is_validator_fee_pool(&self) -> bool { + matches!(self, Self::ValidatorFeePool(_)) + } + pub fn is_global(&self) -> bool { self.is_template() } @@ -748,6 +761,13 @@ impl SubstateValue { } } + pub fn into_validator_fee_pool(self) -> Option { + match self { + SubstateValue::ValidatorFeePool(value) => Some(value), + _ => None, + } + } + pub fn as_validator_fee_pool_mut(&mut self) -> Option<&mut ValidatorFeePool> { match self { SubstateValue::ValidatorFeePool(value) => Some(value), @@ -834,6 +854,7 @@ impl From for SubstateValue { pub struct SubstateDiff { up_substates: Vec<(SubstateId, Substate)>, down_substates: Vec<(SubstateId, u32)>, + fee_withdrawals: Vec, } impl SubstateDiff { @@ -841,11 +862,22 @@ impl SubstateDiff { Self { up_substates: Vec::new(), down_substates: Vec::new(), + fee_withdrawals: Vec::new(), } } - pub fn up(&mut self, address: SubstateId, value: Substate) { - self.up_substates.push((address, value)); + pub fn up(&mut self, id: SubstateId, value: Substate) { + self.up_substates.push((id, value)); + } + + /// Set the fee withdrawals for this diff. + /// + /// # Panics + /// Panics if the fee withdrawals have already been set. + pub fn set_once_fee_withdrawals(&mut self, withdrawals: Vec) -> &mut Self { + assert!(self.fee_withdrawals.is_empty(), "Fee withdrawals set more than once"); + self.fee_withdrawals = withdrawals; + self } pub fn extend_up(&mut self, iter: impl Iterator) -> &mut Self { @@ -853,8 +885,8 @@ impl SubstateDiff { self } - pub fn down(&mut self, address: SubstateId, version: u32) { - self.down_substates.push((address, version)); + pub fn down(&mut self, id: SubstateId, version: u32) { + self.down_substates.push((id, version)); } pub fn extend_down(&mut self, iter: impl Iterator) -> &mut Self { @@ -874,6 +906,10 @@ impl SubstateDiff { self.down_substates.iter() } + pub fn validator_fee_withdrawals(&self) -> &[ValidatorFeeWithdrawal] { + &self.fee_withdrawals + } + pub fn up_len(&self) -> usize { self.up_substates.len() } diff --git a/dan_layer/engine_types/src/substate_serde.rs b/dan_layer/engine_types/src/substate_serde.rs index c799852318..43f6e9a8c6 100644 --- a/dan_layer/engine_types/src/substate_serde.rs +++ b/dan_layer/engine_types/src/substate_serde.rs @@ -21,7 +21,7 @@ use crate::{ published_template::PublishedTemplateAddress, substate::SubstateId, transaction_receipt::TransactionReceiptAddress, - vn_fee_pool::ValidatorFeePoolAddress, + ValidatorFeePoolAddress, }; impl serde::Serialize for SubstateId { diff --git a/dan_layer/engine_types/src/vn_fee_pool.rs b/dan_layer/engine_types/src/validator_fee.rs similarity index 64% rename from dan_layer/engine_types/src/vn_fee_pool.rs rename to dan_layer/engine_types/src/validator_fee.rs index 34efac4563..7a1e1321d7 100644 --- a/dan_layer/engine_types/src/vn_fee_pool.rs +++ b/dan_layer/engine_types/src/validator_fee.rs @@ -1,5 +1,5 @@ -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause +// Copyright 2025 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause use std::{ borrow::Cow, @@ -8,7 +8,8 @@ use std::{ str::FromStr, }; -use tari_bor::{BorTag, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; +use tari_bor::BorTag; use tari_template_lib::{ auth::{OwnerRule, Ownership}, constants::XTR, @@ -120,11 +121,50 @@ impl ValidatorFeePool { } } - pub fn deposit(&mut self, amount: Amount) -> &mut Self { - self.amount += amount; - self + /// Withdraws the given amount from the pool. If the amount is greater than the current balance, the function will + /// return false and the balance will remain unchanged. + /// NB: Do not use this function in the engine. This is used at the consensus level to update fee substates in + /// place. + #[must_use] + pub fn withdraw_direct(&mut self, amount: Amount) -> bool { + match self.amount.checked_sub_positive(amount) { + Some(new_amount) => { + self.amount = new_amount; + true + }, + None => false, + } + } + + /// Deposits the given amount into the pool. If the amount is zero or negative, the function will return false and + /// the balance will remain unchanged. + /// NB: Do not use this function in the engine. This is used at the consensus level to update fee substates in + /// place. + #[must_use] + pub fn deposit_direct(&mut self, amount: Amount) -> bool { + if amount.is_negative() { + return false; + } + match self.amount.checked_add(amount) { + Some(new_amount) => { + self.amount = new_amount; + true + }, + None => false, + } + } + + pub fn amount(&self) -> Amount { + self.amount + } + + pub fn claim_public_key(&self) -> &RistrettoPublicKeyBytes { + &self.claim_public_key } + /// Withdraws all the funds from the pool and returns them in a ResourceContainer. + /// If the pool has insufficient funds, an error is returned. + /// This function is used in the engine to withdraw the funds from the pool and create a Bucket. pub fn withdraw_all(&mut self) -> Result { if self.amount.is_zero() { return Err(ResourceError::InsufficientBalance { @@ -142,3 +182,14 @@ impl ValidatorFeePool { }) } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr( + feature = "ts", + derive(ts_rs::TS), + ts(export, export_to = "../../bindings/src/types/") +)] +pub struct ValidatorFeeWithdrawal { + pub address: ValidatorFeePoolAddress, + pub amount: Amount, +} diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 8a0a81984b..5345e46e90 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -526,7 +526,7 @@ CREATE TABLE validator_epoch_stats created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); -CREATE UNIQUE INDEX participation_shares_uniq_idx_epoch_public_key on validator_epoch_stats (epoch, public_key); +CREATE UNIQUE INDEX validator_epoch_stats_uniq_idx_epoch_public_key on validator_epoch_stats (epoch, public_key); CREATE TABLE evicted_nodes ( diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 87e1b03eea..e98469c65c 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -542,6 +542,16 @@ diesel::table! { } } +diesel::table! { + validator_fee_changes (id) { + id -> Integer, + block_id -> Text, + address -> Text, + amount -> BigInt, + created_at -> Timestamp, + } +} + diesel::table! { votes (id) { id -> Integer, @@ -594,5 +604,6 @@ diesel::allow_tables_to_appear_in_same_query!( transaction_pool_state_updates, transactions, validator_epoch_stats, + validator_fee_changes, votes, ); diff --git a/dan_layer/static_epoch_oracle/Cargo.toml b/dan_layer/static_epoch_oracle/Cargo.toml deleted file mode 100644 index 737a709dc7..0000000000 --- a/dan_layer/static_epoch_oracle/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "tari_static_epoch_oracle" -version.workspace = true -edition.workspace = true -authors.workspace = true -repository.workspace = true -license.workspace = true - -[dependencies] -tari_epoch_manager = { workspace = true } \ No newline at end of file diff --git a/dan_layer/static_epoch_oracle/src/config.rs b/dan_layer/static_epoch_oracle/src/config.rs deleted file mode 100644 index 550f5b3cce..0000000000 --- a/dan_layer/static_epoch_oracle/src/config.rs +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright 2025 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -pub struct Config {} diff --git a/dan_layer/static_epoch_oracle/src/lib.rs b/dan_layer/static_epoch_oracle/src/lib.rs deleted file mode 100644 index 44921285bd..0000000000 --- a/dan_layer/static_epoch_oracle/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2025 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -mod config; -mod oracle; diff --git a/dan_layer/static_epoch_oracle/src/oracle.rs b/dan_layer/static_epoch_oracle/src/oracle.rs deleted file mode 100644 index 5d303d6fbe..0000000000 --- a/dan_layer/static_epoch_oracle/src/oracle.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2025 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -use std::time::Duration; - -use tari_epoch_manager::epoch_event_oracle::{EpochEvent, EpochEventOracle}; - -use crate::config::Config; - -pub struct StaticEpochOracle { - _epoch_time: Duration, - _config: Config, -} - -impl EpochEventOracle for StaticEpochOracle { - async fn next_epoch_event(&mut self) -> Option { - None - } -} diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 6b4854e052..be8dc9cd8b 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -638,11 +638,11 @@ impl Block { tx.blocks_delete(block_id) } - pub fn commit_diff( - &self, - tx: &mut TTx, - block_diff: BlockDiff, - ) -> Result<(), StorageError> { + pub fn commit_diff(&self, tx: &mut TTx, block_diff: BlockDiff) -> Result<(), StorageError> + where + TTx: StateStoreWriteTransaction + Deref, + TTx::Target: StateStoreReadTransaction, + { if block_diff.block_id() != self.id() { return Err(StorageError::QueryError { reason: format!( @@ -666,7 +666,9 @@ impl Block { block_diff.remove(tx)?; } - for change in block_diff.into_changes() { + let BlockDiff { changes, .. } = block_diff; + + for change in changes { match change { SubstateChange::Up { id, diff --git a/dan_layer/storage/src/consensus_models/block_diff.rs b/dan_layer/storage/src/consensus_models/block_diff.rs index f635389c27..dc9f7ab96a 100644 --- a/dan_layer/storage/src/consensus_models/block_diff.rs +++ b/dan_layer/storage/src/consensus_models/block_diff.rs @@ -36,16 +36,13 @@ impl BlockDiff { self.changes.is_empty() } - pub fn into_filtered(self, info: &CommitteeInfo) -> Self { + pub fn into_filtered(self, committee: &CommitteeInfo) -> Self { Self { block_id: self.block_id, changes: self .changes .into_iter() - // Commit all substates included in this shard. Every involved validator commits the transaction receipt. - .filter(|change| - info.includes_substate_id(change.versioned_substate_id().substate_id()) - ) + .filter(|change| committee.shard_group().contains_or_global(&change.shard())) .collect(), } } @@ -64,7 +61,7 @@ impl BlockDiff { } impl BlockDiff { - pub fn insert_record( + pub fn insert( tx: &mut TTx, block_id: &BlockId, changes: &[SubstateChange], diff --git a/dan_layer/storage/src/consensus_models/lock_intent.rs b/dan_layer/storage/src/consensus_models/lock_intent.rs index 8f0ce92c9d..9f6652adb7 100644 --- a/dan_layer/storage/src/consensus_models/lock_intent.rs +++ b/dan_layer/storage/src/consensus_models/lock_intent.rs @@ -139,6 +139,12 @@ impl LockIntent for &VersionedSubstateIdLockIntent { } } +impl AsRef for VersionedSubstateIdLockIntent { + fn as_ref(&self) -> &SubstateId { + self.substate_id() + } +} + #[derive(Debug, Clone, Copy)] pub struct RequireLockIntentRef<'a> { substate_id: &'a SubstateId, diff --git a/dan_layer/storage/src/consensus_models/substate_change.rs b/dan_layer/storage/src/consensus_models/substate_change.rs index 2e35e18cab..0857566fda 100644 --- a/dan_layer/storage/src/consensus_models/substate_change.rs +++ b/dan_layer/storage/src/consensus_models/substate_change.rs @@ -13,12 +13,14 @@ pub enum SubstateChange { Up { id: VersionedSubstateId, shard: Shard, + // TODO: determine if this is needed or if it can be removed transaction_id: TransactionId, substate: Substate, }, Down { id: VersionedSubstateId, shard: Shard, + // TODO: determine if this is needed or if it can be removed transaction_id: TransactionId, }, } @@ -67,16 +69,16 @@ impl SubstateChange { matches!(self, SubstateChange::Up { .. }) } - pub fn up(&self) -> Option<&Substate> { + pub fn up_substate(&self) -> Option<&Substate> { match self { SubstateChange::Up { substate, .. } => Some(substate), _ => None, } } - pub fn down(&self) -> Option<&VersionedSubstateId> { + pub fn up_mut(&mut self) -> Option<&mut Substate> { match self { - SubstateChange::Down { id, .. } => Some(id), + SubstateChange::Up { substate, .. } => Some(substate), _ => None, } } diff --git a/dan_layer/template_lib/src/models/amount.rs b/dan_layer/template_lib/src/models/amount.rs index 5635d07a44..3f6038e1e6 100644 --- a/dan_layer/template_lib/src/models/amount.rs +++ b/dan_layer/template_lib/src/models/amount.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::cmp; +use std::{cmp, ops::Neg}; use newtype_ops::newtype_ops; use serde::{Deserialize, Serialize}; @@ -32,6 +32,7 @@ use tari_template_abi::rust::{ /// Represents an integer quantity of any fungible or non-fungible resource #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default)] +#[cfg_attr(feature = "borsh", derive(borsh::BorshSerialize))] #[serde(transparent)] #[cfg_attr( feature = "ts", @@ -159,6 +160,14 @@ impl From for Amount { } } +impl Neg for Amount { + type Output = Self; + + fn neg(self) -> Self::Output { + Amount(-self.0) + } +} + newtype_ops! { [Amount] {add sub mul div} {:=} Self Self } newtype_ops! { [Amount] {add sub mul div} {:=} &Self &Self } newtype_ops! { [Amount] {add sub mul div} {:=} Self &Self } diff --git a/dan_layer/transaction/src/builder.rs b/dan_layer/transaction/src/builder.rs index 4d43bb2b2d..d54eb5da3d 100644 --- a/dan_layer/transaction/src/builder.rs +++ b/dan_layer/transaction/src/builder.rs @@ -6,8 +6,8 @@ use tari_dan_common_types::{Epoch, SubstateRequirement}; use tari_engine_types::{ confidential::ConfidentialClaim, instruction::Instruction, - vn_fee_pool::ValidatorFeePoolAddress, TemplateAddress, + ValidatorFeePoolAddress, }; use tari_template_lib::{ args, diff --git a/utilities/tariswap_test_bench/src/runner.rs b/utilities/tariswap_test_bench/src/runner.rs index f231b97d5e..77bd53699b 100644 --- a/utilities/tariswap_test_bench/src/runner.rs +++ b/utilities/tariswap_test_bench/src/runner.rs @@ -105,6 +105,7 @@ impl Runner { pub fn new_transaction_builder(&self) -> TransactionBuilder { // 0x10 is LocalNet - avoiding having to include tari_common + // Igor is 0x24 Transaction::builder().for_network(0x10) } } diff --git a/utilities/tariswap_test_bench/src/templates.rs b/utilities/tariswap_test_bench/src/templates.rs index 26287cfd9f..5bb8679aac 100644 --- a/utilities/tariswap_test_bench/src/templates.rs +++ b/utilities/tariswap_test_bench/src/templates.rs @@ -18,14 +18,14 @@ pub async fn get_templates(cli: &CommonArgs) -> anyhow::Result<(TemplateMetadata } else { templates .iter() - .find(|t| t.name == "TariSwapPool") + .find(|t| t.name.eq_ignore_ascii_case("TariSwapPool")) .ok_or(anyhow::anyhow!("Tariswap template not found"))? .clone() }; let faucet = templates .iter() - .find(|t| t.name == "TestFaucet") + .find(|t| t.name.eq_ignore_ascii_case("TestFaucet")) .ok_or(anyhow::anyhow!("Faucet template not found"))? .clone();