Skip to content

Commit

Permalink
fix(consensus)!: allow multiple read-only shard references in proposa…
Browse files Browse the repository at this point in the history
…ls (#894)

Description
---
Includes locks in evidence
Checks locks when proposing to allow multiple input refs

Motivation and Context
---
Evidence for each shard within a TransactionAtom now includes the lock
type.
Updated the proposal db query to check each lock type and ensure that
conflicts either don't occur or if they do they are all read locks.

How Has This Been Tested?
---
Ran a stress test from #880, previously after #885 was merged, funding
the tariswap components would take a very long time (I've never actually
run it to completion, but ran for 30 mins without completing). With this
PR funding took roughly a minute on my test. Swap batches are also
reaching finalization within an acceptable timeframe.

What process can a PR reviewer use to test or verify this change?
---
Submit multiple transactions which use a single substate as an input ref
and check that they can be added to the same block.

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted (Evidence struct changed)
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Jan 16, 2024
1 parent 9d1ebb4 commit 651016d
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 61 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions applications/tari_dan_wallet_daemon/src/handlers/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ pub async fn handle_create(
.locate_dependent_substates(&[&default_account.address])
.await?;

// We aren't mutating the resources
let (input_refs, inputs) = inputs.into_iter().partition::<Vec<_>, _>(|s| s.address.is_resource());

let signing_key_index = req.key_id.unwrap_or(default_account.key_index);
let signing_key = key_manager_api.derive_key(key_manager::TRANSACTION_BRANCH, signing_key_index)?;

Expand All @@ -127,6 +130,7 @@ pub async fn handle_create(
let transaction = Transaction::builder()
.fee_transaction_pay_from_component(default_account.address.as_component_address().unwrap(), max_fee)
.call_function(*ACCOUNT_TEMPLATE_ADDRESS, "create", args![owner_token])
.with_input_refs(input_refs.iter().map(|s| ShardId::from_address(&s.address, s.version)))
.with_inputs(
inputs
.iter()
Expand Down
4 changes: 3 additions & 1 deletion dan_layer/engine_types/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

use std::fmt::Display;

use tari_bor::{Deserialize, Serialize};

pub type LockId = u32;
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum LockFlag {
Read,
Write,
Expand Down
1 change: 1 addition & 0 deletions dan_layer/state_store_sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license.workspace = true
tari_dan_storage = { workspace = true }
tari_dan_common_types = { workspace = true }
tari_transaction = { workspace = true }
tari_engine_types = { workspace = true }
# TODO: needed for FixedHash
tari_common_types = { workspace = true }
tari_utilities = { workspace = true }
Expand Down
83 changes: 56 additions & 27 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ use tari_dan_storage::{
StateStoreReadTransaction,
StorageError,
};
use tari_engine_types::lock::LockFlag;
use tari_transaction::TransactionId;
use tari_utilities::ByteArray;

use crate::{
error::SqliteStorageError,
serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex},
serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex},
sql_models,
sqlite_transaction::SqliteTransaction,
};
Expand Down Expand Up @@ -1183,20 +1184,18 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
updates.len()
);

let mut used_substates = HashSet::<ShardId>::new();
let mut processed_substates = HashMap::<TransactionId, HashSet<ShardId>>::new();
let mut used_substates = HashMap::<ShardId, LockFlag>::new();
let mut processed_substates = HashMap::<TransactionId, HashSet<(ShardId, _)>>::with_capacity(updates.len());
for (tx_id, update) in &updates {
if let Some(Decision::Abort) = update
.local_decision
.as_ref()
.map(|decision| parse_from_string(decision.as_str()))
.transpose()?
{
if update.local_decision.as_deref() == Some(Decision::Abort.as_str()) {
// The aborted transaction don't lock any substates
continue;
}
let evidence = deserialize_json::<Evidence>(&update.evidence)?;
let evidence = evidence.shards_iter().copied().collect::<HashSet<_>>();
let evidence = evidence
.iter()
.map(|(shard, evidence)| (*shard, evidence.lock))
.collect::<HashSet<(ShardId, _)>>();
processed_substates.insert(deserialize_hex_try_from(tx_id)?, evidence);
}

Expand All @@ -1206,26 +1205,56 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
let maybe_update = updates.remove(&rec.transaction_id);
match rec.try_convert(maybe_update) {
Ok(rec) => {
if rec.is_ready() {
let tx_substates: HashSet<ShardId> = rec
.transaction()
.evidence
.shards_iter()
.copied()
.collect::<HashSet<_>>();
if tx_substates.is_disjoint(&used_substates) &&
processed_substates.iter().all(|(tx_id, substates)| {
tx_id == rec.transaction_id() || tx_substates.is_disjoint(substates)
})
{
used_substates.extend(tx_substates);
Some(Ok(rec))
if !rec.is_ready() {
return None;
}

let tx_substates = rec
.transaction()
.evidence
.iter()
.map(|(shard, evidence)| (*shard, evidence.lock))
.collect::<HashMap<_, _>>();

// Are there any conflicts between the currently selected set and this transaction?
if tx_substates.iter().any(|(shard, lock)| {
if lock.is_write() {
// Write lock must have no conflicts
used_substates.contains_key(shard)
} else {
// TODO: If we don't switch to "no version" transaction, then we can abort these here.
// That also requires changes to the on_ready_to_vote_on_local_block
None
// If there is a Shard conflict, then it must not be a write lock
used_substates
.get(shard)
.map(|tx_lock| tx_lock.is_write())
.unwrap_or(false)
}
}) {
return None;
}

// Are there any conflicts between this transaction and other transactions to be included in the
// block?
if processed_substates
.iter()
// Check other transactions
.filter(|(tx_id, _)| *tx_id != rec.transaction_id())
.all(|(_, evidence)| {
evidence.iter().all(|(shard, lock)| {
if lock.is_write() {
// Write lock must have no conflicts
!tx_substates.contains_key(shard)
} else {
// If there is a Shard conflict, then it must be a read lock
tx_substates.get(shard).map(|tx_lock| tx_lock.is_read()).unwrap_or(true)
}
})
})
{
used_substates.extend(tx_substates);
Some(Ok(rec))
} else {
// TODO: If we don't switch to "no version" transaction, then we can abort these here.
// That also requires changes to the on_ready_to_vote_on_local_block
None
}
},
Expand Down
1 change: 1 addition & 0 deletions dan_layer/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tari_crypto = { workspace = true }

anyhow = { workspace = true }
chrono = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
log = { workspace = true }
rand = { workspace = true }
thiserror = { workspace = true }
Expand Down
77 changes: 57 additions & 20 deletions dan_layer/storage/src/consensus_models/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::{Display, Formatter},
};

use indexmap::{IndexMap, IndexSet};
use serde::{Deserialize, Serialize};
use tari_dan_common_types::ShardId;
use tari_engine_types::lock::LockFlag;
use tari_transaction::TransactionId;

use super::ForeignProposal;
Expand All @@ -18,15 +19,15 @@ use crate::{
StorageError,
};

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct Evidence {
evidence: BTreeMap<ShardId, Vec<QcId>>,
evidence: IndexMap<ShardId, ShardEvidence>,
}

impl Evidence {
pub const fn empty() -> Self {
pub fn empty() -> Self {
Self {
evidence: BTreeMap::new(),
evidence: IndexMap::new(),
}
}

Expand All @@ -43,15 +44,22 @@ impl Evidence {
self.evidence.len()
}

pub fn get(&self, shard_id: &ShardId) -> Option<&ShardEvidence> {
self.evidence.get(shard_id)
}

pub fn num_complete_shards(&self) -> usize {
self.evidence.values().filter(|qc_ids| !qc_ids.is_empty()).count()
self.evidence
.values()
.filter(|evidence| !evidence.qc_ids.is_empty())
.count()
}

pub fn iter(&self) -> impl Iterator<Item = (&ShardId, &Vec<QcId>)> {
pub fn iter(&self) -> impl Iterator<Item = (&ShardId, &ShardEvidence)> {
self.evidence.iter()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ShardId, &mut Vec<QcId>)> {
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ShardId, &mut ShardEvidence)> {
self.evidence.iter_mut()
}

Expand All @@ -60,31 +68,60 @@ impl Evidence {
}

pub fn qc_ids_iter(&self) -> impl Iterator<Item = &QcId> + '_ {
self.evidence.values().flatten()
self.evidence.values().flat_map(|e| e.qc_ids.iter())
}

pub fn merge(&mut self, other: Evidence) -> &mut Self {
for (shard_id, qc_ids) in other.evidence {
let entry = self.evidence.entry(shard_id).or_default();
for qc_id in qc_ids {
if !entry.contains(&qc_id) {
entry.push(qc_id);
}
}
for (shard_id, shard_evidence) in other.evidence {
let entry = self.evidence.entry(shard_id).or_insert_with(|| ShardEvidence {
qc_ids: IndexSet::new(),
lock: shard_evidence.lock,
});
entry.qc_ids.extend(shard_evidence.qc_ids);
}
self
}
}

impl FromIterator<(ShardId, Vec<QcId>)> for Evidence {
fn from_iter<T: IntoIterator<Item = (ShardId, Vec<QcId>)>>(iter: T) -> Self {
impl FromIterator<(ShardId, ShardEvidence)> for Evidence {
fn from_iter<T: IntoIterator<Item = (ShardId, ShardEvidence)>>(iter: T) -> Self {
Evidence {
evidence: iter.into_iter().collect(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
impl Extend<(ShardId, ShardEvidence)> for Evidence {
fn extend<T: IntoIterator<Item = (ShardId, ShardEvidence)>>(&mut self, iter: T) {
self.evidence.extend(iter.into_iter())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShardEvidence {
pub qc_ids: IndexSet<QcId>,
pub lock: LockFlag,
}

impl ShardEvidence {
pub fn new(qc_ids: IndexSet<QcId>, lock: LockFlag) -> Self {
Self { qc_ids, lock }
}

pub fn is_empty(&self) -> bool {
self.qc_ids.is_empty()
}

pub fn contains(&self, qc_id: &QcId) -> bool {
self.qc_ids.contains(qc_id)
}

pub fn insert(&mut self, qc_id: QcId) {
self.qc_ids.insert(qc_id);
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransactionAtom {
pub id: TransactionId,
pub decision: Decision,
Expand Down Expand Up @@ -116,7 +153,7 @@ impl Display for TransactionAtom {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Command {
/// Command to prepare a transaction.
Prepare(TransactionAtom),
Expand Down
36 changes: 29 additions & 7 deletions dan_layer/storage/src/consensus_models/executed_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ use std::{
time::Duration,
};

use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tari_dan_common_types::{optional::Optional, ShardId};
use tari_engine_types::commit_result::{ExecuteResult, FinalizeResult, RejectReason};
use tari_engine_types::{
commit_result::{ExecuteResult, FinalizeResult, RejectReason},
lock::LockFlag,
};
use tari_transaction::{Transaction, TransactionId};

use crate::{
consensus_models::{Decision, Evidence, TransactionAtom, TransactionRecord},
consensus_models::{Decision, Evidence, ShardEvidence, TransactionAtom, TransactionRecord},
StateStoreReadTransaction,
StateStoreWriteTransaction,
StorageError,
Expand Down Expand Up @@ -131,11 +135,29 @@ impl ExecutedTransaction {
}

pub fn to_initial_evidence(&self) -> Evidence {
self.transaction
.all_inputs_iter()
.chain(self.resulting_outputs())
.map(|shard| (*shard, vec![]))
.collect()
let mut evidence = Evidence::empty();
evidence.extend(self.transaction.inputs().iter().map(|input| {
(*input, ShardEvidence {
qc_ids: IndexSet::new(),
lock: LockFlag::Write,
})
}));

evidence.extend(self.transaction.input_refs().iter().map(|input_ref| {
(*input_ref, ShardEvidence {
qc_ids: IndexSet::new(),
lock: LockFlag::Read,
})
}));

evidence.extend(self.resulting_outputs.iter().map(|output| {
(*output, ShardEvidence {
qc_ids: IndexSet::new(),
lock: LockFlag::Write,
})
}));

evidence
}

pub fn is_finalized(&self) -> bool {
Expand Down
Loading

0 comments on commit 651016d

Please sign in to comment.