diff --git a/Cargo.lock b/Cargo.lock index 9f1ea30e3..df2dcd5c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,6 +4012,7 @@ checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.3", + "serde", ] [[package]] @@ -9192,6 +9193,7 @@ version = "0.3.0" dependencies = [ "anyhow", "chrono", + "indexmap 2.1.0", "log", "rand", "serde", @@ -9740,6 +9742,7 @@ dependencies = [ "tari_common_types", "tari_dan_common_types", "tari_dan_storage", + "tari_engine_types", "tari_transaction", "tari_utilities", "thiserror", diff --git a/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs b/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs index 70eabfae0..c2c9f1f6b 100644 --- a/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs +++ b/applications/tari_dan_wallet_daemon/src/handlers/accounts.rs @@ -107,6 +107,9 @@ pub async fn handle_create( .locate_dependent_substates(&[&default_account.address]) .await?; + // We aren't mutating the resources + let (input_refs, inputs) = inputs.into_iter().partition::, _>(|s| s.address.is_resource()); + let signing_key_index = req.key_id.unwrap_or(default_account.key_index); let signing_key = key_manager_api.derive_key(key_manager::TRANSACTION_BRANCH, signing_key_index)?; @@ -127,6 +130,7 @@ pub async fn handle_create( let transaction = Transaction::builder() .fee_transaction_pay_from_component(default_account.address.as_component_address().unwrap(), max_fee) .call_function(*ACCOUNT_TEMPLATE_ADDRESS, "create", args![owner_token]) + .with_input_refs(input_refs.iter().map(|s| ShardId::from_address(&s.address, s.version))) .with_inputs( inputs .iter() diff --git a/dan_layer/engine_types/src/lock.rs b/dan_layer/engine_types/src/lock.rs index db599e324..a4df4765c 100644 --- a/dan_layer/engine_types/src/lock.rs +++ b/dan_layer/engine_types/src/lock.rs @@ -3,8 +3,10 @@ use std::fmt::Display; +use tari_bor::{Deserialize, Serialize}; + pub type LockId = u32; -#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum LockFlag { Read, Write, diff --git a/dan_layer/state_store_sqlite/Cargo.toml b/dan_layer/state_store_sqlite/Cargo.toml index aceab9e51..7793d312a 100644 --- a/dan_layer/state_store_sqlite/Cargo.toml +++ b/dan_layer/state_store_sqlite/Cargo.toml @@ -11,6 +11,7 @@ license.workspace = true tari_dan_storage = { workspace = true } tari_dan_common_types = { workspace = true } tari_transaction = { workspace = true } +tari_engine_types = { workspace = true } # TODO: needed for FixedHash tari_common_types = { workspace = true } tari_utilities = { workspace = true } diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 67fb982e4..f84df68d9 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -58,12 +58,13 @@ use tari_dan_storage::{ StateStoreReadTransaction, StorageError, }; +use tari_engine_types::lock::LockFlag; use tari_transaction::TransactionId; use tari_utilities::ByteArray; use crate::{ error::SqliteStorageError, - serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex}, + serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex}, sql_models, sqlite_transaction::SqliteTransaction, }; @@ -1183,20 +1184,18 @@ impl StateStoreReadTransa updates.len() ); - let mut used_substates = HashSet::::new(); - let mut processed_substates = HashMap::>::new(); + let mut used_substates = HashMap::::new(); + let mut processed_substates = HashMap::>::with_capacity(updates.len()); for (tx_id, update) in &updates { - if let Some(Decision::Abort) = update - .local_decision - .as_ref() - .map(|decision| parse_from_string(decision.as_str())) - .transpose()? - { + if update.local_decision.as_deref() == Some(Decision::Abort.as_str()) { // The aborted transaction don't lock any substates continue; } let evidence = deserialize_json::(&update.evidence)?; - let evidence = evidence.shards_iter().copied().collect::>(); + let evidence = evidence + .iter() + .map(|(shard, evidence)| (*shard, evidence.lock)) + .collect::>(); processed_substates.insert(deserialize_hex_try_from(tx_id)?, evidence); } @@ -1206,26 +1205,56 @@ impl StateStoreReadTransa let maybe_update = updates.remove(&rec.transaction_id); match rec.try_convert(maybe_update) { Ok(rec) => { - if rec.is_ready() { - let tx_substates: HashSet = rec - .transaction() - .evidence - .shards_iter() - .copied() - .collect::>(); - if tx_substates.is_disjoint(&used_substates) && - processed_substates.iter().all(|(tx_id, substates)| { - tx_id == rec.transaction_id() || tx_substates.is_disjoint(substates) - }) - { - used_substates.extend(tx_substates); - Some(Ok(rec)) + if !rec.is_ready() { + return None; + } + + let tx_substates = rec + .transaction() + .evidence + .iter() + .map(|(shard, evidence)| (*shard, evidence.lock)) + .collect::>(); + + // Are there any conflicts between the currently selected set and this transaction? + if tx_substates.iter().any(|(shard, lock)| { + if lock.is_write() { + // Write lock must have no conflicts + used_substates.contains_key(shard) } else { - // TODO: If we don't switch to "no version" transaction, then we can abort these here. - // That also requires changes to the on_ready_to_vote_on_local_block - None + // If there is a Shard conflict, then it must not be a write lock + used_substates + .get(shard) + .map(|tx_lock| tx_lock.is_write()) + .unwrap_or(false) } + }) { + return None; + } + + // Are there any conflicts between this transaction and other transactions to be included in the + // block? + if processed_substates + .iter() + // Check other transactions + .filter(|(tx_id, _)| *tx_id != rec.transaction_id()) + .all(|(_, evidence)| { + evidence.iter().all(|(shard, lock)| { + if lock.is_write() { + // Write lock must have no conflicts + !tx_substates.contains_key(shard) + } else { + // If there is a Shard conflict, then it must be a read lock + tx_substates.get(shard).map(|tx_lock| tx_lock.is_read()).unwrap_or(true) + } + }) + }) + { + used_substates.extend(tx_substates); + Some(Ok(rec)) } else { + // TODO: If we don't switch to "no version" transaction, then we can abort these here. + // That also requires changes to the on_ready_to_vote_on_local_block None } }, diff --git a/dan_layer/storage/Cargo.toml b/dan_layer/storage/Cargo.toml index 10197131e..8d2ec25da 100644 --- a/dan_layer/storage/Cargo.toml +++ b/dan_layer/storage/Cargo.toml @@ -21,6 +21,7 @@ tari_crypto = { workspace = true } anyhow = { workspace = true } chrono = { workspace = true } +indexmap = { workspace = true, features = ["serde"] } log = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } diff --git a/dan_layer/storage/src/consensus_models/command.rs b/dan_layer/storage/src/consensus_models/command.rs index 8d0f8dd5a..ececbf07d 100644 --- a/dan_layer/storage/src/consensus_models/command.rs +++ b/dan_layer/storage/src/consensus_models/command.rs @@ -3,12 +3,13 @@ use std::{ cmp::Ordering, - collections::BTreeMap, fmt::{Display, Formatter}, }; +use indexmap::{IndexMap, IndexSet}; use serde::{Deserialize, Serialize}; use tari_dan_common_types::ShardId; +use tari_engine_types::lock::LockFlag; use tari_transaction::TransactionId; use super::ForeignProposal; @@ -18,15 +19,15 @@ use crate::{ StorageError, }; -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct Evidence { - evidence: BTreeMap>, + evidence: IndexMap, } impl Evidence { - pub const fn empty() -> Self { + pub fn empty() -> Self { Self { - evidence: BTreeMap::new(), + evidence: IndexMap::new(), } } @@ -43,15 +44,22 @@ impl Evidence { self.evidence.len() } + pub fn get(&self, shard_id: &ShardId) -> Option<&ShardEvidence> { + self.evidence.get(shard_id) + } + pub fn num_complete_shards(&self) -> usize { - self.evidence.values().filter(|qc_ids| !qc_ids.is_empty()).count() + self.evidence + .values() + .filter(|evidence| !evidence.qc_ids.is_empty()) + .count() } - pub fn iter(&self) -> impl Iterator)> { + pub fn iter(&self) -> impl Iterator { self.evidence.iter() } - pub fn iter_mut(&mut self) -> impl Iterator)> { + pub fn iter_mut(&mut self) -> impl Iterator { self.evidence.iter_mut() } @@ -60,31 +68,60 @@ impl Evidence { } pub fn qc_ids_iter(&self) -> impl Iterator + '_ { - self.evidence.values().flatten() + self.evidence.values().flat_map(|e| e.qc_ids.iter()) } pub fn merge(&mut self, other: Evidence) -> &mut Self { - for (shard_id, qc_ids) in other.evidence { - let entry = self.evidence.entry(shard_id).or_default(); - for qc_id in qc_ids { - if !entry.contains(&qc_id) { - entry.push(qc_id); - } - } + for (shard_id, shard_evidence) in other.evidence { + let entry = self.evidence.entry(shard_id).or_insert_with(|| ShardEvidence { + qc_ids: IndexSet::new(), + lock: shard_evidence.lock, + }); + entry.qc_ids.extend(shard_evidence.qc_ids); } self } } -impl FromIterator<(ShardId, Vec)> for Evidence { - fn from_iter)>>(iter: T) -> Self { +impl FromIterator<(ShardId, ShardEvidence)> for Evidence { + fn from_iter>(iter: T) -> Self { Evidence { evidence: iter.into_iter().collect(), } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +impl Extend<(ShardId, ShardEvidence)> for Evidence { + fn extend>(&mut self, iter: T) { + self.evidence.extend(iter.into_iter()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ShardEvidence { + pub qc_ids: IndexSet, + pub lock: LockFlag, +} + +impl ShardEvidence { + pub fn new(qc_ids: IndexSet, lock: LockFlag) -> Self { + Self { qc_ids, lock } + } + + pub fn is_empty(&self) -> bool { + self.qc_ids.is_empty() + } + + pub fn contains(&self, qc_id: &QcId) -> bool { + self.qc_ids.contains(qc_id) + } + + pub fn insert(&mut self, qc_id: QcId) { + self.qc_ids.insert(qc_id); + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TransactionAtom { pub id: TransactionId, pub decision: Decision, @@ -116,7 +153,7 @@ impl Display for TransactionAtom { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Command { /// Command to prepare a transaction. Prepare(TransactionAtom), diff --git a/dan_layer/storage/src/consensus_models/executed_transaction.rs b/dan_layer/storage/src/consensus_models/executed_transaction.rs index 046c8a031..469f80695 100644 --- a/dan_layer/storage/src/consensus_models/executed_transaction.rs +++ b/dan_layer/storage/src/consensus_models/executed_transaction.rs @@ -8,13 +8,17 @@ use std::{ time::Duration, }; +use indexmap::IndexSet; use serde::{Deserialize, Serialize}; use tari_dan_common_types::{optional::Optional, ShardId}; -use tari_engine_types::commit_result::{ExecuteResult, FinalizeResult, RejectReason}; +use tari_engine_types::{ + commit_result::{ExecuteResult, FinalizeResult, RejectReason}, + lock::LockFlag, +}; use tari_transaction::{Transaction, TransactionId}; use crate::{ - consensus_models::{Decision, Evidence, TransactionAtom, TransactionRecord}, + consensus_models::{Decision, Evidence, ShardEvidence, TransactionAtom, TransactionRecord}, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, @@ -131,11 +135,29 @@ impl ExecutedTransaction { } pub fn to_initial_evidence(&self) -> Evidence { - self.transaction - .all_inputs_iter() - .chain(self.resulting_outputs()) - .map(|shard| (*shard, vec![])) - .collect() + let mut evidence = Evidence::empty(); + evidence.extend(self.transaction.inputs().iter().map(|input| { + (*input, ShardEvidence { + qc_ids: IndexSet::new(), + lock: LockFlag::Write, + }) + })); + + evidence.extend(self.transaction.input_refs().iter().map(|input_ref| { + (*input_ref, ShardEvidence { + qc_ids: IndexSet::new(), + lock: LockFlag::Read, + }) + })); + + evidence.extend(self.resulting_outputs.iter().map(|output| { + (*output, ShardEvidence { + qc_ids: IndexSet::new(), + lock: LockFlag::Write, + }) + })); + + evidence } pub fn is_finalized(&self) -> bool { diff --git a/dan_layer/storage/src/consensus_models/transaction_decision.rs b/dan_layer/storage/src/consensus_models/transaction_decision.rs index 10d391b92..ac549a367 100644 --- a/dan_layer/storage/src/consensus_models/transaction_decision.rs +++ b/dan_layer/storage/src/consensus_models/transaction_decision.rs @@ -32,14 +32,18 @@ impl Decision { Decision::Abort => Decision::Abort, } } + + pub const fn as_str(&self) -> &'static str { + match self { + Decision::Commit => "Commit", + Decision::Abort => "Abort", + } + } } impl Display for Decision { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Decision::Commit => write!(f, "Commit"), - Decision::Abort => write!(f, "Abort"), - } + f.write_str(self.as_str()) } } diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index 370434f6a..5b016fa8b 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -350,9 +350,9 @@ impl TransactionPoolRecord { pub fn add_evidence(&mut self, committee_shard: &CommitteeShard, qc_id: QcId) -> &mut Self { let evidence = &mut self.transaction.evidence; - for (shard, qcs_mut) in evidence.iter_mut() { + for (shard, evidence_mut) in evidence.iter_mut() { if committee_shard.includes_shard(shard) { - qcs_mut.push(qc_id); + evidence_mut.qc_ids.insert(qc_id); } }