Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus)!: allow multiple read-only shard references in proposals #894

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions applications/tari_dan_wallet_daemon/src/handlers/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ pub async fn handle_create(
.locate_dependent_substates(&[&default_account.address])
.await?;

// We aren't mutating the resources
let (input_refs, inputs) = inputs.into_iter().partition::<Vec<_>, _>(|s| s.address.is_resource());

let signing_key_index = req.key_id.unwrap_or(default_account.key_index);
let signing_key = key_manager_api.derive_key(key_manager::TRANSACTION_BRANCH, signing_key_index)?;

Expand All @@ -127,6 +130,7 @@ pub async fn handle_create(
let transaction = Transaction::builder()
.fee_transaction_pay_from_component(default_account.address.as_component_address().unwrap(), max_fee)
.call_function(*ACCOUNT_TEMPLATE_ADDRESS, "create", args![owner_token])
.with_input_refs(input_refs.iter().map(|s| ShardId::from_address(&s.address, s.version)))
.with_inputs(
inputs
.iter()
Expand Down
4 changes: 3 additions & 1 deletion dan_layer/engine_types/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

use std::fmt::Display;

use tari_bor::{Deserialize, Serialize};

pub type LockId = u32;
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum LockFlag {
Read,
Write,
Expand Down
1 change: 1 addition & 0 deletions dan_layer/state_store_sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license.workspace = true
tari_dan_storage = { workspace = true }
tari_dan_common_types = { workspace = true }
tari_transaction = { workspace = true }
tari_engine_types = { workspace = true }
# TODO: needed for FixedHash
tari_common_types = { workspace = true }
tari_utilities = { workspace = true }
Expand Down
83 changes: 56 additions & 27 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ use tari_dan_storage::{
StateStoreReadTransaction,
StorageError,
};
use tari_engine_types::lock::LockFlag;
use tari_transaction::TransactionId;
use tari_utilities::ByteArray;

use crate::{
error::SqliteStorageError,
serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex},
serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex},
sql_models,
sqlite_transaction::SqliteTransaction,
};
Expand Down Expand Up @@ -1183,20 +1184,18 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
updates.len()
);

let mut used_substates = HashSet::<ShardId>::new();
let mut processed_substates = HashMap::<TransactionId, HashSet<ShardId>>::new();
let mut used_substates = HashMap::<ShardId, LockFlag>::new();
let mut processed_substates = HashMap::<TransactionId, HashSet<(ShardId, _)>>::with_capacity(updates.len());
for (tx_id, update) in &updates {
if let Some(Decision::Abort) = update
.local_decision
.as_ref()
.map(|decision| parse_from_string(decision.as_str()))
.transpose()?
{
if update.local_decision.as_deref() == Some(Decision::Abort.as_str()) {
// The aborted transaction don't lock any substates
continue;
}
let evidence = deserialize_json::<Evidence>(&update.evidence)?;
let evidence = evidence.shards_iter().copied().collect::<HashSet<_>>();
let evidence = evidence
.iter()
.map(|(shard, evidence)| (*shard, evidence.lock))
.collect::<HashSet<(ShardId, _)>>();
processed_substates.insert(deserialize_hex_try_from(tx_id)?, evidence);
}

Expand All @@ -1206,26 +1205,56 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
let maybe_update = updates.remove(&rec.transaction_id);
match rec.try_convert(maybe_update) {
Ok(rec) => {
if rec.is_ready() {
let tx_substates: HashSet<ShardId> = rec
.transaction()
.evidence
.shards_iter()
.copied()
.collect::<HashSet<_>>();
if tx_substates.is_disjoint(&used_substates) &&
processed_substates.iter().all(|(tx_id, substates)| {
tx_id == rec.transaction_id() || tx_substates.is_disjoint(substates)
})
{
used_substates.extend(tx_substates);
Some(Ok(rec))
if !rec.is_ready() {
return None;
}

let tx_substates = rec
.transaction()
.evidence
.iter()
.map(|(shard, evidence)| (*shard, evidence.lock))
.collect::<HashMap<_, _>>();

// Are there any conflicts between the currently selected set and this transaction?
if tx_substates.iter().any(|(shard, lock)| {
if lock.is_write() {
// Write lock must have no conflicts
used_substates.contains_key(shard)
} else {
// TODO: If we don't switch to "no version" transaction, then we can abort these here.
// That also requires changes to the on_ready_to_vote_on_local_block
None
// If there is a Shard conflict, then it must not be a write lock
used_substates
.get(shard)
.map(|tx_lock| tx_lock.is_write())
.unwrap_or(false)
}
}) {
return None;
}

// Are there any conflicts between this transaction and other transactions to be included in the
// block?
if processed_substates
.iter()
// Check other transactions
.filter(|(tx_id, _)| *tx_id != rec.transaction_id())
.all(|(_, evidence)| {
evidence.iter().all(|(shard, lock)| {
if lock.is_write() {
// Write lock must have no conflicts
!tx_substates.contains_key(shard)
} else {
// If there is a Shard conflict, then it must be a read lock
tx_substates.get(shard).map(|tx_lock| tx_lock.is_read()).unwrap_or(true)
}
})
})
{
used_substates.extend(tx_substates);
Some(Ok(rec))
} else {
// TODO: If we don't switch to "no version" transaction, then we can abort these here.
// That also requires changes to the on_ready_to_vote_on_local_block
None
}
},
Expand Down
1 change: 1 addition & 0 deletions dan_layer/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tari_crypto = { workspace = true }

anyhow = { workspace = true }
chrono = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
log = { workspace = true }
rand = { workspace = true }
thiserror = { workspace = true }
Expand Down
77 changes: 57 additions & 20 deletions dan_layer/storage/src/consensus_models/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::{Display, Formatter},
};

use indexmap::{IndexMap, IndexSet};
use serde::{Deserialize, Serialize};
use tari_dan_common_types::ShardId;
use tari_engine_types::lock::LockFlag;
use tari_transaction::TransactionId;

use super::ForeignProposal;
Expand All @@ -18,15 +19,15 @@ use crate::{
StorageError,
};

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct Evidence {
evidence: BTreeMap<ShardId, Vec<QcId>>,
evidence: IndexMap<ShardId, ShardEvidence>,
}

impl Evidence {
pub const fn empty() -> Self {
pub fn empty() -> Self {
Self {
evidence: BTreeMap::new(),
evidence: IndexMap::new(),
}
}

Expand All @@ -43,15 +44,22 @@ impl Evidence {
self.evidence.len()
}

pub fn get(&self, shard_id: &ShardId) -> Option<&ShardEvidence> {
self.evidence.get(shard_id)
}

pub fn num_complete_shards(&self) -> usize {
self.evidence.values().filter(|qc_ids| !qc_ids.is_empty()).count()
self.evidence
.values()
.filter(|evidence| !evidence.qc_ids.is_empty())
.count()
}

pub fn iter(&self) -> impl Iterator<Item = (&ShardId, &Vec<QcId>)> {
pub fn iter(&self) -> impl Iterator<Item = (&ShardId, &ShardEvidence)> {
self.evidence.iter()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ShardId, &mut Vec<QcId>)> {
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ShardId, &mut ShardEvidence)> {
self.evidence.iter_mut()
}

Expand All @@ -60,31 +68,60 @@ impl Evidence {
}

pub fn qc_ids_iter(&self) -> impl Iterator<Item = &QcId> + '_ {
self.evidence.values().flatten()
self.evidence.values().flat_map(|e| e.qc_ids.iter())
}

pub fn merge(&mut self, other: Evidence) -> &mut Self {
for (shard_id, qc_ids) in other.evidence {
let entry = self.evidence.entry(shard_id).or_default();
for qc_id in qc_ids {
if !entry.contains(&qc_id) {
entry.push(qc_id);
}
}
for (shard_id, shard_evidence) in other.evidence {
let entry = self.evidence.entry(shard_id).or_insert_with(|| ShardEvidence {
qc_ids: IndexSet::new(),
lock: shard_evidence.lock,
});
entry.qc_ids.extend(shard_evidence.qc_ids);
}
self
}
}

impl FromIterator<(ShardId, Vec<QcId>)> for Evidence {
fn from_iter<T: IntoIterator<Item = (ShardId, Vec<QcId>)>>(iter: T) -> Self {
impl FromIterator<(ShardId, ShardEvidence)> for Evidence {
fn from_iter<T: IntoIterator<Item = (ShardId, ShardEvidence)>>(iter: T) -> Self {
Evidence {
evidence: iter.into_iter().collect(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
impl Extend<(ShardId, ShardEvidence)> for Evidence {
fn extend<T: IntoIterator<Item = (ShardId, ShardEvidence)>>(&mut self, iter: T) {
self.evidence.extend(iter.into_iter())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShardEvidence {
pub qc_ids: IndexSet<QcId>,
pub lock: LockFlag,
}

impl ShardEvidence {
pub fn new(qc_ids: IndexSet<QcId>, lock: LockFlag) -> Self {
Self { qc_ids, lock }
}

pub fn is_empty(&self) -> bool {
self.qc_ids.is_empty()
}

pub fn contains(&self, qc_id: &QcId) -> bool {
self.qc_ids.contains(qc_id)
}

pub fn insert(&mut self, qc_id: QcId) {
self.qc_ids.insert(qc_id);
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransactionAtom {
pub id: TransactionId,
pub decision: Decision,
Expand Down Expand Up @@ -116,7 +153,7 @@ impl Display for TransactionAtom {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Command {
/// Command to prepare a transaction.
Prepare(TransactionAtom),
Expand Down
36 changes: 29 additions & 7 deletions dan_layer/storage/src/consensus_models/executed_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ use std::{
time::Duration,
};

use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tari_dan_common_types::{optional::Optional, ShardId};
use tari_engine_types::commit_result::{ExecuteResult, FinalizeResult, RejectReason};
use tari_engine_types::{
commit_result::{ExecuteResult, FinalizeResult, RejectReason},
lock::LockFlag,
};
use tari_transaction::{Transaction, TransactionId};

use crate::{
consensus_models::{Decision, Evidence, TransactionAtom, TransactionRecord},
consensus_models::{Decision, Evidence, ShardEvidence, TransactionAtom, TransactionRecord},
StateStoreReadTransaction,
StateStoreWriteTransaction,
StorageError,
Expand Down Expand Up @@ -131,11 +135,29 @@ impl ExecutedTransaction {
}

pub fn to_initial_evidence(&self) -> Evidence {
self.transaction
.all_inputs_iter()
.chain(self.resulting_outputs())
.map(|shard| (*shard, vec![]))
.collect()
let mut evidence = Evidence::empty();
evidence.extend(self.transaction.inputs().iter().map(|input| {
(*input, ShardEvidence {
qc_ids: IndexSet::new(),
lock: LockFlag::Write,
})
}));

evidence.extend(self.transaction.input_refs().iter().map(|input_ref| {
(*input_ref, ShardEvidence {
qc_ids: IndexSet::new(),
lock: LockFlag::Read,
})
}));

evidence.extend(self.resulting_outputs.iter().map(|output| {
(*output, ShardEvidence {
qc_ids: IndexSet::new(),
lock: LockFlag::Write,
})
}));

evidence
}

pub fn is_finalized(&self) -> bool {
Expand Down
Loading
Loading