From 651016dbb70d2f6169eea53b8ac2e3b32b2e5d7f Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 16 Jan 2024 14:15:56 +0400 Subject: [PATCH] fix(consensus)!: allow multiple read-only shard references in proposals (#894) Description --- Includes locks in evidence Checks locks when proposing to allow multiple input refs Motivation and Context --- Evidence for each shard within a TransactionAtom now includes the lock type. Updated the proposal db query to check each lock type and ensure that conflicts either don't occur or if they do they are all read locks. How Has This Been Tested? --- Ran a stress test from #880, previously after #885 was merged, funding the tariswap components would take a very long time (I've never actually run it to completion, but ran for 30 mins without completing). With this PR funding took roughly a minute on my test. Swap batches are also reaching finalization within an acceptable timeframe. What process can a PR reviewer use to test or verify this change? --- Submit multiple transactions which use a single substate as an input ref and check that they can be added to the same block. Breaking Changes --- - [ ] None - [x] Requires data directory to be deleted (Evidence struct changed) - [ ] Other - Please specify --- Cargo.lock | 3 + .../src/handlers/accounts.rs | 4 + dan_layer/engine_types/src/lock.rs | 4 +- dan_layer/state_store_sqlite/Cargo.toml | 1 + dan_layer/state_store_sqlite/src/reader.rs | 83 +++++++++++++------ dan_layer/storage/Cargo.toml | 1 + .../storage/src/consensus_models/command.rs | 77 ++++++++++++----- .../consensus_models/executed_transaction.rs | 36 ++++++-- .../consensus_models/transaction_decision.rs | 12 ++- .../src/consensus_models/transaction_pool.rs | 4 +- 10 files changed, 164 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f1ea30e3..df2dcd5c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,6 +4012,7 @@ checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.3", + "serde", ] [[package]] @@ -9192,6 +9193,7 @@ version = "0.3.0" dependencies = [ "anyhow", "chrono", + "indexmap 2.1.0", "log", "rand", "serde", @@ -9740,6 +9742,7 @@ dependencies = [ "tari_common_types", "tari_dan_common_types", "tari_dan_storage", + "tari_engine_types", "tari_transaction", "tari_utilities", "thiserror", diff --git a/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs b/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs index 70eabfae0..c2c9f1f6b 100644 --- a/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs +++ b/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs @@ -107,6 +107,9 @@ pub async fn handle_create( .locate_dependent_substates(&[&default_account.address]) .await?; + // We aren't mutating the resources + let (input_refs, inputs) = inputs.into_iter().partition::, _>(|s| s.address.is_resource()); + let signing_key_index = req.key_id.unwrap_or(default_account.key_index); let signing_key = key_manager_api.derive_key(key_manager::TRANSACTION_BRANCH, signing_key_index)?; @@ -127,6 +130,7 @@ pub async fn handle_create( let transaction = Transaction::builder() .fee_transaction_pay_from_component(default_account.address.as_component_address().unwrap(), max_fee) .call_function(*ACCOUNT_TEMPLATE_ADDRESS, "create", args![owner_token]) + .with_input_refs(input_refs.iter().map(|s| ShardId::from_address(&s.address, s.version))) .with_inputs( inputs .iter() diff --git a/dan_layer/engine_types/src/lock.rs b/dan_layer/engine_types/src/lock.rs index db599e324..a4df4765c 100644 --- a/dan_layer/engine_types/src/lock.rs +++ b/dan_layer/engine_types/src/lock.rs @@ -3,8 +3,10 @@ use std::fmt::Display; +use tari_bor::{Deserialize, Serialize}; + pub type LockId = u32; -#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum LockFlag { Read, Write, diff --git a/dan_layer/state_store_sqlite/Cargo.toml b/dan_layer/state_store_sqlite/Cargo.toml index aceab9e51..7793d312a 100644 --- a/dan_layer/state_store_sqlite/Cargo.toml +++ b/dan_layer/state_store_sqlite/Cargo.toml @@ -11,6 +11,7 @@ license.workspace = true tari_dan_storage = { workspace = true } tari_dan_common_types = { workspace = true } tari_transaction = { workspace = true } +tari_engine_types = { workspace = true } # TODO: needed for FixedHash tari_common_types = { workspace = true } tari_utilities = { workspace = true } diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 67fb982e4..f84df68d9 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -58,12 +58,13 @@ use tari_dan_storage::{ StateStoreReadTransaction, StorageError, }; +use tari_engine_types::lock::LockFlag; use tari_transaction::TransactionId; use tari_utilities::ByteArray; use crate::{ error::SqliteStorageError, - serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex}, + serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex}, sql_models, sqlite_transaction::SqliteTransaction, }; @@ -1183,20 +1184,18 @@ impl StateStoreReadTransa updates.len() ); - let mut used_substates = HashSet::::new(); - let mut processed_substates = HashMap::>::new(); + let mut used_substates = HashMap::::new(); + let mut processed_substates = HashMap::>::with_capacity(updates.len()); for (tx_id, update) in &updates { - if let Some(Decision::Abort) = update - .local_decision - .as_ref() - .map(|decision| parse_from_string(decision.as_str())) - .transpose()? - { + if update.local_decision.as_deref() == Some(Decision::Abort.as_str()) { // The aborted transaction don't lock any substates continue; } let evidence = deserialize_json::(&update.evidence)?; - let evidence = evidence.shards_iter().copied().collect::>(); + let evidence = evidence + .iter() + .map(|(shard, evidence)| (*shard, evidence.lock)) + .collect::>(); processed_substates.insert(deserialize_hex_try_from(tx_id)?, evidence); } @@ -1206,26 +1205,56 @@ impl StateStoreReadTransa let maybe_update = updates.remove(&rec.transaction_id); match rec.try_convert(maybe_update) { Ok(rec) => { - if rec.is_ready() { - let tx_substates: HashSet = rec - .transaction() - .evidence - .shards_iter() - .copied() - .collect::>(); - if tx_substates.is_disjoint(&used_substates) && - processed_substates.iter().all(|(tx_id, substates)| { - tx_id == rec.transaction_id() || tx_substates.is_disjoint(substates) - }) - { - used_substates.extend(tx_substates); - Some(Ok(rec)) + if !rec.is_ready() { + return None; + } + + let tx_substates = rec + .transaction() + .evidence + .iter() + .map(|(shard, evidence)| (*shard, evidence.lock)) + .collect::>(); + + // Are there any conflicts between the currently selected set and this transaction? + if tx_substates.iter().any(|(shard, lock)| { + if lock.is_write() { + // Write lock must have no conflicts + used_substates.contains_key(shard) } else { - // TODO: If we don't switch to "no version" transaction, then we can abort these here. - // That also requires changes to the on_ready_to_vote_on_local_block - None + // If there is a Shard conflict, then it must not be a write lock + used_substates + .get(shard) + .map(|tx_lock| tx_lock.is_write()) + .unwrap_or(false) } + }) { + return None; + } + + // Are there any conflicts between this transaction and other transactions to be included in the + // block? + if processed_substates + .iter() + // Check other transactions + .filter(|(tx_id, _)| *tx_id != rec.transaction_id()) + .all(|(_, evidence)| { + evidence.iter().all(|(shard, lock)| { + if lock.is_write() { + // Write lock must have no conflicts + !tx_substates.contains_key(shard) + } else { + // If there is a Shard conflict, then it must be a read lock + tx_substates.get(shard).map(|tx_lock| tx_lock.is_read()).unwrap_or(true) + } + }) + }) + { + used_substates.extend(tx_substates); + Some(Ok(rec)) } else { + // TODO: If we don't switch to "no version" transaction, then we can abort these here. + // That also requires changes to the on_ready_to_vote_on_local_block None } }, diff --git a/dan_layer/storage/Cargo.toml b/dan_layer/storage/Cargo.toml index 10197131e..8d2ec25da 100644 --- a/dan_layer/storage/Cargo.toml +++ b/dan_layer/storage/Cargo.toml @@ -21,6 +21,7 @@ tari_crypto = { workspace = true } anyhow = { workspace = true } chrono = { workspace = true } +indexmap = { workspace = true, features = ["serde"] } log = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } diff --git a/dan_layer/storage/src/consensus_models/command.rs b/dan_layer/storage/src/consensus_models/command.rs index 8d0f8dd5a..ececbf07d 100644 --- a/dan_layer/storage/src/consensus_models/command.rs +++ b/dan_layer/storage/src/consensus_models/command.rs @@ -3,12 +3,13 @@ use std::{ cmp::Ordering, - collections::BTreeMap, fmt::{Display, Formatter}, }; +use indexmap::{IndexMap, IndexSet}; use serde::{Deserialize, Serialize}; use tari_dan_common_types::ShardId; +use tari_engine_types::lock::LockFlag; use tari_transaction::TransactionId; use super::ForeignProposal; @@ -18,15 +19,15 @@ use crate::{ StorageError, }; -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct Evidence { - evidence: BTreeMap>, + evidence: IndexMap, } impl Evidence { - pub const fn empty() -> Self { + pub fn empty() -> Self { Self { - evidence: BTreeMap::new(), + evidence: IndexMap::new(), } } @@ -43,15 +44,22 @@ impl Evidence { self.evidence.len() } + pub fn get(&self, shard_id: &ShardId) -> Option<&ShardEvidence> { + self.evidence.get(shard_id) + } + pub fn num_complete_shards(&self) -> usize { - self.evidence.values().filter(|qc_ids| !qc_ids.is_empty()).count() + self.evidence + .values() + .filter(|evidence| !evidence.qc_ids.is_empty()) + .count() } - pub fn iter(&self) -> impl Iterator)> { + pub fn iter(&self) -> impl Iterator { self.evidence.iter() } - pub fn iter_mut(&mut self) -> impl Iterator)> { + pub fn iter_mut(&mut self) -> impl Iterator { self.evidence.iter_mut() } @@ -60,31 +68,60 @@ impl Evidence { } pub fn qc_ids_iter(&self) -> impl Iterator + '_ { - self.evidence.values().flatten() + self.evidence.values().flat_map(|e| e.qc_ids.iter()) } pub fn merge(&mut self, other: Evidence) -> &mut Self { - for (shard_id, qc_ids) in other.evidence { - let entry = self.evidence.entry(shard_id).or_default(); - for qc_id in qc_ids { - if !entry.contains(&qc_id) { - entry.push(qc_id); - } - } + for (shard_id, shard_evidence) in other.evidence { + let entry = self.evidence.entry(shard_id).or_insert_with(|| ShardEvidence { + qc_ids: IndexSet::new(), + lock: shard_evidence.lock, + }); + entry.qc_ids.extend(shard_evidence.qc_ids); } self } } -impl FromIterator<(ShardId, Vec)> for Evidence { - fn from_iter)>>(iter: T) -> Self { +impl FromIterator<(ShardId, ShardEvidence)> for Evidence { + fn from_iter>(iter: T) -> Self { Evidence { evidence: iter.into_iter().collect(), } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +impl Extend<(ShardId, ShardEvidence)> for Evidence { + fn extend>(&mut self, iter: T) { + self.evidence.extend(iter.into_iter()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ShardEvidence { + pub qc_ids: IndexSet, + pub lock: LockFlag, +} + +impl ShardEvidence { + pub fn new(qc_ids: IndexSet, lock: LockFlag) -> Self { + Self { qc_ids, lock } + } + + pub fn is_empty(&self) -> bool { + self.qc_ids.is_empty() + } + + pub fn contains(&self, qc_id: &QcId) -> bool { + self.qc_ids.contains(qc_id) + } + + pub fn insert(&mut self, qc_id: QcId) { + self.qc_ids.insert(qc_id); + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TransactionAtom { pub id: TransactionId, pub decision: Decision, @@ -116,7 +153,7 @@ impl Display for TransactionAtom { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Command { /// Command to prepare a transaction. Prepare(TransactionAtom), diff --git a/dan_layer/storage/src/consensus_models/executed_transaction.rs b/dan_layer/storage/src/consensus_models/executed_transaction.rs index 046c8a031..469f80695 100644 --- a/dan_layer/storage/src/consensus_models/executed_transaction.rs +++ b/dan_layer/storage/src/consensus_models/executed_transaction.rs @@ -8,13 +8,17 @@ use std::{ time::Duration, }; +use indexmap::IndexSet; use serde::{Deserialize, Serialize}; use tari_dan_common_types::{optional::Optional, ShardId}; -use tari_engine_types::commit_result::{ExecuteResult, FinalizeResult, RejectReason}; +use tari_engine_types::{ + commit_result::{ExecuteResult, FinalizeResult, RejectReason}, + lock::LockFlag, +}; use tari_transaction::{Transaction, TransactionId}; use crate::{ - consensus_models::{Decision, Evidence, TransactionAtom, TransactionRecord}, + consensus_models::{Decision, Evidence, ShardEvidence, TransactionAtom, TransactionRecord}, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, @@ -131,11 +135,29 @@ impl ExecutedTransaction { } pub fn to_initial_evidence(&self) -> Evidence { - self.transaction - .all_inputs_iter() - .chain(self.resulting_outputs()) - .map(|shard| (*shard, vec![])) - .collect() + let mut evidence = Evidence::empty(); + evidence.extend(self.transaction.inputs().iter().map(|input| { + (*input, ShardEvidence { + qc_ids: IndexSet::new(), + lock: LockFlag::Write, + }) + })); + + evidence.extend(self.transaction.input_refs().iter().map(|input_ref| { + (*input_ref, ShardEvidence { + qc_ids: IndexSet::new(), + lock: LockFlag::Read, + }) + })); + + evidence.extend(self.resulting_outputs.iter().map(|output| { + (*output, ShardEvidence { + qc_ids: IndexSet::new(), + lock: LockFlag::Write, + }) + })); + + evidence } pub fn is_finalized(&self) -> bool { diff --git a/dan_layer/storage/src/consensus_models/transaction_decision.rs b/dan_layer/storage/src/consensus_models/transaction_decision.rs index 10d391b92..ac549a367 100644 --- a/dan_layer/storage/src/consensus_models/transaction_decision.rs +++ b/dan_layer/storage/src/consensus_models/transaction_decision.rs @@ -32,14 +32,18 @@ impl Decision { Decision::Abort => Decision::Abort, } } + + pub const fn as_str(&self) -> &'static str { + match self { + Decision::Commit => "Commit", + Decision::Abort => "Abort", + } + } } impl Display for Decision { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Decision::Commit => write!(f, "Commit"), - Decision::Abort => write!(f, "Abort"), - } + f.write_str(self.as_str()) } } diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index 370434f6a..5b016fa8b 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -350,9 +350,9 @@ impl TransactionPoolRecord { pub fn add_evidence(&mut self, committee_shard: &CommitteeShard, qc_id: QcId) -> &mut Self { let evidence = &mut self.transaction.evidence; - for (shard, qcs_mut) in evidence.iter_mut() { + for (shard, evidence_mut) in evidence.iter_mut() { if committee_shard.includes_shard(shard) { - qcs_mut.push(qc_id); + evidence_mut.qc_ids.insert(qc_id); } }