From 2f532dd0c1f7db02bea86e5d492d0eb428fcfe00 Mon Sep 17 00:00:00 2001 From: Maksim Vykhota Date: Thu, 23 Sep 2021 23:10:50 +0300 Subject: [PATCH] init commit --- .gitignore | 2 + Cargo.toml | 21 + README.MD | 3 + src/error.rs | 70 +++ src/lib.rs | 116 +++++ src/listener.rs | 89 ++++ src/options.rs | 36 ++ src/pool.rs | 756 +++++++++++++++++++++++++++++++ src/ready.rs | 58 +++ src/replace.rs | 55 +++ src/scoring.rs | 189 ++++++++ src/status.rs | 40 ++ src/tests/helpers.rs | 140 ++++++ src/tests/mod.rs | 961 ++++++++++++++++++++++++++++++++++++++++ src/tests/tx_builder.rs | 67 +++ src/transactions.rs | 268 +++++++++++ src/verifier.rs | 31 ++ 17 files changed, 2902 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 README.MD create mode 100644 src/error.rs create mode 100644 src/lib.rs create mode 100644 src/listener.rs create mode 100644 src/options.rs create mode 100644 src/pool.rs create mode 100644 src/ready.rs create mode 100644 src/replace.rs create mode 100644 src/scoring.rs create mode 100644 src/status.rs create mode 100644 src/tests/helpers.rs create mode 100644 src/tests/mod.rs create mode 100644 src/tests/tx_builder.rs create mode 100644 src/transactions.rs create mode 100644 src/verifier.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..36e2917 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +/target \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..db2478e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +# Copyright 2021 Gnosis Ltd. +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "txpool" +version = "1.0.0-alpha" +authors = ["Dragan Rakita "] +edition = "2018" +description = "Generic transaction pool." + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies.log] +version = "0.4" + +[dependencies.smallvec] +version = "0.6" + +[dependencies.trace-time] +version = "0.1" +[dev-dependencies.ethereum-types] +version = "0.7" \ No newline at end of file diff --git a/README.MD b/README.MD new file mode 100644 index 0000000..2d757c5 --- /dev/null +++ b/README.MD @@ -0,0 +1,3 @@ +Generic ethereum transaction pool forked from here: + +https://github.com/openethereum/openethereum/tree/main/crates/transaction-pool diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..1678c0c --- /dev/null +++ b/src/error.rs @@ -0,0 +1,70 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::{error, fmt, result}; + +/// Transaction Pool Error +#[derive(Debug)] +pub enum Error { + /// Transaction is already imported + AlreadyImported(Hash), + /// Transaction is too cheap to enter the queue + TooCheapToEnter(Hash, String), + /// Transaction is too cheap to replace existing transaction that occupies the same slot. + TooCheapToReplace(Hash, Hash), +} + +/// Transaction Pool Result +pub type Result = result::Result>; + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::AlreadyImported(h) => write!(f, "[{:?}] already imported", h), + Error::TooCheapToEnter(hash, min_score) => write!( + f, + "[{:x}] too cheap to enter the pool. Min score: {}", + hash, min_score + ), + Error::TooCheapToReplace(old_hash, hash) => { + write!(f, "[{:x}] too cheap to replace: {:x}", hash, old_hash) + } + } + } +} + +impl error::Error for Error {} + +#[cfg(test)] +impl PartialEq for Error +where + H: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + use self::Error::*; + + match (self, other) { + (&AlreadyImported(ref h1), &AlreadyImported(ref h2)) => h1 == h2, + (&TooCheapToEnter(ref h1, ref s1), &TooCheapToEnter(ref h2, ref s2)) => { + h1 == h2 && s1 == s2 + } + (&TooCheapToReplace(ref old1, ref new1), &TooCheapToReplace(ref old2, ref new2)) => { + old1 == old2 && new1 == new2 + } + _ => false, + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a15be05 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,116 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Generic Transaction Pool +//! +//! An extensible and performant implementation of Ethereum Transaction Pool. +//! The pool stores ordered, verified transactions according to some pluggable +//! `Scoring` implementation. +//! The pool also allows you to construct a set of `pending` transactions according +//! to some notion of `Readiness` (pluggable). +//! +//! The pool is generic over transactions and should make no assumptions about them. +//! The only thing we can rely on is the `Scoring` that defines: +//! - the ordering of transactions from a single sender +//! - the priority of the transaction compared to other transactions from different senders +//! +//! NOTE: the transactions from a single sender are not ordered by priority, +//! but still when constructing pending set we always need to maintain the ordering +//! (i.e. `txs[1]` always needs to be included after `txs[0]` even if it has higher priority) +//! +//! ### Design Details +//! +//! Performance assumptions: +//! - Possibility to handle tens of thousands of transactions +//! - Fast insertions and replacements `O(per-sender + log(senders))` +//! - Reasonably fast removal of stalled transactions `O(per-sender)` +//! - Reasonably fast construction of pending set `O(txs * (log(senders) + log(per-sender))` +//! +//! The removal performance could be improved by trading some memory. Currently `SmallVec` is used +//! to store senders transactions, instead we could use `VecDeque` and efficiently `pop_front` +//! the best transactions. +//! +//! The pending set construction and insertion complexity could be reduced by introducing +//! a notion of `nonce` - an absolute, numeric ordering of transactions. +//! We don't do that because of possible implications of EIP208 where nonce might not be +//! explicitly available. +//! +//! 1. The pool groups transactions from particular sender together +//! and stores them ordered by `Scoring` within that group +//! i.e. `HashMap>`. +//! 2. Additionaly we maintain the best and the worst transaction from each sender +//! (by `Scoring` not `priority`) ordered by `priority`. +//! It means that we can easily identify the best transaction inside the entire pool +//! and the worst transaction. +//! 3. Whenever new transaction is inserted to the queue: +//! - first check all the limits (overall, memory, per-sender) +//! - retrieve all transactions from a sender +//! - binary search for position to insert the transaction +//! - decide if we are replacing existing transaction (3 outcomes: drop, replace, insert) +//! - update best and worst transaction from that sender if affected +//! 4. Pending List construction: +//! - Take the best transaction (by priority) from all senders to the List +//! - Replace the transaction with next transaction (by ordering) from that sender (if any) +//! - Repeat + +#![warn(missing_docs)] + +#[cfg(test)] +mod tests; + +mod error; +mod listener; +mod options; +mod pool; +mod ready; +mod replace; +mod status; +mod transactions; +mod verifier; + +pub mod scoring; + +pub use self::{ + error::Error, + listener::{Listener, NoopListener}, + options::Options, + pool::{PendingIterator, Pool, Transaction, UnorderedIterator}, + ready::{Readiness, Ready}, + replace::{ReplaceTransaction, ShouldReplace}, + scoring::Scoring, + status::{LightStatus, Status}, + verifier::Verifier, +}; + +use std::{fmt, hash::Hash}; + +/// Already verified transaction that can be safely queued. +pub trait VerifiedTransaction: fmt::Debug { + /// Transaction hash type. + type Hash: fmt::Debug + fmt::LowerHex + Eq + Clone + Hash; + + /// Transaction sender type. + type Sender: fmt::Debug + Eq + Clone + Hash + Send; + + /// Transaction hash + fn hash(&self) -> &Self::Hash; + + /// Memory usage + fn mem_usage(&self) -> usize; + + /// Transaction sender + fn sender(&self) -> &Self::Sender; +} diff --git a/src/listener.rs b/src/listener.rs new file mode 100644 index 0000000..7b090a3 --- /dev/null +++ b/src/listener.rs @@ -0,0 +1,89 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use crate::error::Error; +use std::{ + fmt::{Debug, LowerHex}, + sync::Arc, +}; + +/// Transaction pool listener. +/// +/// Listener is being notified about status of every transaction in the pool. +pub trait Listener { + /// The transaction has been successfuly added to the pool. + /// If second argument is `Some` the transaction has took place of some other transaction + /// which was already in pool. + /// NOTE: You won't be notified about drop of `old` transaction separately. + fn added(&mut self, _tx: &Arc, _old: Option<&Arc>) {} + + /// The transaction was rejected from the pool. + /// It means that it was too cheap to replace any transaction already in the pool. + fn rejected(&mut self, _tx: &Arc, _reason: &Error) {} + + /// The transaction was pushed out from the pool because of the limit. + fn dropped(&mut self, _tx: &Arc, _by: Option<&T>) {} + + /// The transaction was marked as invalid by executor. + fn invalid(&mut self, _tx: &Arc) {} + + /// The transaction has been canceled. + fn canceled(&mut self, _tx: &Arc) {} + + /// The transaction has been culled from the pool. + fn culled(&mut self, _tx: &Arc) {} +} + +/// A no-op implementation of `Listener`. +#[derive(Debug)] +pub struct NoopListener; +impl Listener for NoopListener {} + +impl Listener for (A, B) +where + A: Listener, + B: Listener, +{ + fn added(&mut self, tx: &Arc, old: Option<&Arc>) { + self.0.added(tx, old); + self.1.added(tx, old); + } + + fn rejected(&mut self, tx: &Arc, reason: &Error) { + self.0.rejected(tx, reason); + self.1.rejected(tx, reason); + } + + fn dropped(&mut self, tx: &Arc, by: Option<&T>) { + self.0.dropped(tx, by); + self.1.dropped(tx, by); + } + + fn invalid(&mut self, tx: &Arc) { + self.0.invalid(tx); + self.1.invalid(tx); + } + + fn canceled(&mut self, tx: &Arc) { + self.0.canceled(tx); + self.1.canceled(tx); + } + + fn culled(&mut self, tx: &Arc) { + self.0.culled(tx); + self.1.culled(tx); + } +} diff --git a/src/options.rs b/src/options.rs new file mode 100644 index 0000000..6ea46ba --- /dev/null +++ b/src/options.rs @@ -0,0 +1,36 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Transaction Pool options. +#[derive(Clone, Debug, PartialEq)] +pub struct Options { + /// Maximal number of transactions in the pool. + pub max_count: usize, + /// Maximal number of transactions from single sender. + pub max_per_sender: usize, + /// Maximal memory usage. + pub max_mem_usage: usize, +} + +impl Default for Options { + fn default() -> Self { + Options { + max_count: 1024, + max_per_sender: 16, + max_mem_usage: 8 * 1024 * 1024, + } + } +} diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..8ba64ea --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,756 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use log::{trace, warn}; +use std::{ + collections::{hash_map, BTreeSet, HashMap}, + slice, + sync::Arc, +}; + +use crate::{ + error, + listener::{Listener, NoopListener}, + options::Options, + ready::{Readiness, Ready}, + replace::{ReplaceTransaction, ShouldReplace}, + scoring::{self, ScoreWithRef, Scoring}, + status::{LightStatus, Status}, + transactions::{AddResult, Transactions}, + VerifiedTransaction, +}; + +/// Internal representation of transaction. +/// +/// Includes unique insertion id that can be used for scoring explictly, +/// but internally is used to resolve conflicts in case of equal scoring +/// (newer transactionsa are preferred). +#[derive(Debug)] +pub struct Transaction { + /// Sequential id of the transaction + pub insertion_id: u64, + /// Shared transaction + pub transaction: Arc, +} + +impl Clone for Transaction { + fn clone(&self) -> Self { + Transaction { + insertion_id: self.insertion_id, + transaction: self.transaction.clone(), + } + } +} + +impl ::std::ops::Deref for Transaction { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.transaction + } +} +/// A transaction pool. +#[derive(Debug)] +pub struct Pool, L = NoopListener> { + listener: L, + scoring: S, + options: Options, + mem_usage: usize, + + transactions: HashMap>, + by_hash: HashMap>, + + best_transactions: BTreeSet>, + worst_transactions: BTreeSet>, + + insertion_id: u64, +} + +impl + Default> Default for Pool { + fn default() -> Self { + Self::with_scoring(S::default(), Options::default()) + } +} + +impl + Default> Pool { + /// Creates a new `Pool` with given options + /// and default `Scoring` and `Listener`. + pub fn with_options(options: Options) -> Self { + Self::with_scoring(S::default(), options) + } +} + +impl> Pool { + /// Creates a new `Pool` with given `Scoring` and options. + pub fn with_scoring(scoring: S, options: Options) -> Self { + Self::new(NoopListener, scoring, options) + } +} + +const INITIAL_NUMBER_OF_SENDERS: usize = 16; + +impl Pool +where + T: VerifiedTransaction, + S: Scoring, + L: Listener, +{ + /// Creates new `Pool` with given `Scoring`, `Listener` and options. + pub fn new(listener: L, scoring: S, options: Options) -> Self { + let transactions = HashMap::with_capacity(INITIAL_NUMBER_OF_SENDERS); + let by_hash = HashMap::with_capacity(options.max_count / 16); + + Pool { + listener, + scoring, + options, + mem_usage: 0, + transactions, + by_hash, + best_transactions: Default::default(), + worst_transactions: Default::default(), + insertion_id: 0, + } + } + + /// Attempts to import new transaction to the pool, returns a `Arc` or an `Error`. + /// + /// NOTE: Since `Ready`ness is separate from the pool it's possible to import stalled transactions. + /// It's the caller responsibility to make sure that's not the case. + /// + /// NOTE: The transaction may push out some other transactions from the pool + /// either because of limits (see `Options`) or because `Scoring` decides that the transaction + /// replaces an existing transaction from that sender. + /// + /// If any limit is reached the transaction with the lowest `Score` will be compared with the + /// new transaction via the supplied `ShouldReplace` implementation and may be evicted. + /// + /// The `Listener` will be informed on any drops or rejections. + pub fn import( + &mut self, + transaction: T, + replace: &dyn ShouldReplace, + ) -> error::Result, T::Hash> { + let mem_usage = transaction.mem_usage(); + + if self.by_hash.contains_key(transaction.hash()) { + return Err(error::Error::AlreadyImported(transaction.hash().clone())); + } + + self.insertion_id += 1; + let transaction = Transaction { + insertion_id: self.insertion_id, + transaction: Arc::new(transaction), + }; + + // TODO [ToDr] Most likely move this after the transaction is inserted. + // Avoid using should_replace, but rather use scoring for that. + { + let remove_worst = + |s: &mut Self, transaction| match s.remove_worst(transaction, replace) { + Err(err) => { + s.listener.rejected(transaction, &err); + Err(err) + } + Ok(None) => Ok(false), + Ok(Some(removed)) => { + s.listener.dropped(&removed, Some(transaction)); + s.finalize_remove(removed.hash()); + Ok(true) + } + }; + + while self.by_hash.len() + 1 > self.options.max_count { + trace!( + "Count limit reached: {} > {}", + self.by_hash.len() + 1, + self.options.max_count + ); + if !remove_worst(self, &transaction)? { + break; + } + } + + while self.mem_usage + mem_usage > self.options.max_mem_usage { + trace!( + "Mem limit reached: {} > {}", + self.mem_usage + mem_usage, + self.options.max_mem_usage + ); + if !remove_worst(self, &transaction)? { + break; + } + } + } + + let (result, prev_state, current_state) = { + let transactions = self + .transactions + .entry(transaction.sender().clone()) + .or_insert_with(Transactions::default); + // get worst and best transactions for comparison + let prev = transactions.worst_and_best(); + let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender); + let current = transactions.worst_and_best(); + (result, prev, current) + }; + + // update best and worst transactions from this sender (if required) + self.update_senders_worst_and_best(prev_state, current_state); + + match result { + AddResult::Ok(tx) => { + self.listener.added(&tx, None); + self.finalize_insert(&tx, None); + Ok(tx.transaction) + } + AddResult::PushedOut { new, old } | AddResult::Replaced { new, old } => { + self.listener.added(&new, Some(&old)); + self.finalize_insert(&new, Some(&old)); + Ok(new.transaction) + } + AddResult::TooCheap { new, old } => { + let error = error::Error::TooCheapToReplace(old.hash().clone(), new.hash().clone()); + self.listener.rejected(&new, &error); + return Err(error); + } + AddResult::TooCheapToEnter(new, score) => { + let error = + error::Error::TooCheapToEnter(new.hash().clone(), format!("{:#x}", score)); + self.listener.rejected(&new, &error); + return Err(error); + } + } + } + + /// Updates state of the pool statistics if the transaction was added to a set. + fn finalize_insert(&mut self, new: &Transaction, old: Option<&Transaction>) { + self.mem_usage += new.mem_usage(); + self.by_hash.insert(new.hash().clone(), new.clone()); + + if let Some(old) = old { + self.finalize_remove(old.hash()); + } + } + + /// Updates the pool statistics if transaction was removed. + fn finalize_remove(&mut self, hash: &T::Hash) -> Option> { + self.by_hash.remove(hash).map(|old| { + self.mem_usage -= old.transaction.mem_usage(); + old.transaction + }) + } + + /// Updates best and worst transactions from a sender. + fn update_senders_worst_and_best( + &mut self, + previous: Option<((S::Score, Transaction), (S::Score, Transaction))>, + current: Option<((S::Score, Transaction), (S::Score, Transaction))>, + ) { + let worst_collection = &mut self.worst_transactions; + let best_collection = &mut self.best_transactions; + + let is_same = |a: &(S::Score, Transaction), b: &(S::Score, Transaction)| { + a.0 == b.0 && a.1.hash() == b.1.hash() + }; + + let update = |collection: &mut BTreeSet<_>, (score, tx), remove| { + if remove { + collection.remove(&ScoreWithRef::new(score, tx)); + } else { + collection.insert(ScoreWithRef::new(score, tx)); + } + }; + + match (previous, current) { + (None, Some((worst, best))) => { + update(worst_collection, worst, false); + update(best_collection, best, false); + } + (Some((worst, best)), None) => { + // all transactions from that sender has been removed. + // We can clear a hashmap entry. + self.transactions.remove(worst.1.sender()); + update(worst_collection, worst, true); + update(best_collection, best, true); + } + (Some((w1, b1)), Some((w2, b2))) => { + if !is_same(&w1, &w2) { + update(worst_collection, w1, true); + update(worst_collection, w2, false); + } + if !is_same(&b1, &b2) { + update(best_collection, b1, true); + update(best_collection, b2, false); + } + } + (None, None) => {} + } + } + + /// Attempts to remove the worst transaction from the pool if it's worse than the given one. + /// + /// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not. + /// In such case we will accept the transaction even though it is going to exceed the limit. + fn remove_worst( + &mut self, + transaction: &Transaction, + replace: &dyn ShouldReplace, + ) -> error::Result>, T::Hash> { + let to_remove = match self.worst_transactions.iter().next_back() { + // No elements to remove? and the pool is still full? + None => { + warn!("The pool is full but there are no transactions to remove."); + return Err(error::Error::TooCheapToEnter( + transaction.hash().clone(), + "unknown".into(), + )); + } + Some(old) => { + let txs = &self.transactions; + let get_replace_tx = |tx| { + let sender_txs = txs + .get(transaction.sender()) + .map(|txs| txs.iter_transactions().as_slice()); + ReplaceTransaction::new(tx, sender_txs) + }; + let old_replace = get_replace_tx(&old.transaction); + let new_replace = get_replace_tx(transaction); + + match replace.should_replace(&old_replace, &new_replace) { + // We can't decide which of them should be removed, so accept both. + scoring::Choice::InsertNew => None, + // New transaction is better than the worst one so we can replace it. + scoring::Choice::ReplaceOld => Some(old.clone()), + // otherwise fail + scoring::Choice::RejectNew => { + return Err(error::Error::TooCheapToEnter( + transaction.hash().clone(), + format!("{:#x}", old.score), + )) + } + } + } + }; + + if let Some(to_remove) = to_remove { + // Remove from transaction set + self.remove_from_set(to_remove.transaction.sender(), |set, scoring| { + set.remove(&to_remove.transaction, scoring) + }); + + Ok(Some(to_remove.transaction)) + } else { + Ok(None) + } + } + + /// Removes transaction from sender's transaction `HashMap`. + fn remove_from_set, &S) -> R>( + &mut self, + sender: &T::Sender, + f: F, + ) -> Option { + let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) { + let prev = set.worst_and_best(); + let result = f(set, &self.scoring); + (prev, set.worst_and_best(), result) + } else { + return None; + }; + + self.update_senders_worst_and_best(prev, next); + Some(result) + } + + /// Clears pool from all transactions. + /// This causes a listener notification that all transactions were dropped. + /// NOTE: the drop-notification order will be arbitrary. + pub fn clear(&mut self) { + self.mem_usage = 0; + self.transactions.clear(); + self.best_transactions.clear(); + self.worst_transactions.clear(); + + for (_hash, tx) in self.by_hash.drain() { + self.listener.dropped(&tx.transaction, None) + } + } + + /// Removes single transaction from the pool. + /// Depending on the `is_invalid` flag the listener + /// will either get a `cancelled` or `invalid` notification. + pub fn remove(&mut self, hash: &T::Hash, is_invalid: bool) -> Option> { + if let Some(tx) = self.finalize_remove(hash) { + self.remove_from_set(tx.sender(), |set, scoring| set.remove(&tx, scoring)); + if is_invalid { + self.listener.invalid(&tx); + } else { + self.listener.canceled(&tx); + } + Some(tx) + } else { + None + } + } + + /// Removes all stalled transactions from given sender. + fn remove_stalled>(&mut self, sender: &T::Sender, ready: &mut R) -> usize { + let removed_from_set = self.remove_from_set(sender, |transactions, scoring| { + transactions.cull(ready, scoring) + }); + + match removed_from_set { + Some(removed) => { + let len = removed.len(); + for tx in removed { + self.finalize_remove(tx.hash()); + self.listener.culled(&tx); + } + len + } + None => 0, + } + } + + /// Removes all stalled transactions from given sender list (or from all senders). + pub fn cull>(&mut self, senders: Option<&[T::Sender]>, mut ready: R) -> usize { + let mut removed = 0; + match senders { + Some(senders) => { + for sender in senders { + removed += self.remove_stalled(sender, &mut ready); + } + } + None => { + let senders = self.transactions.keys().cloned().collect::>(); + for sender in senders { + removed += self.remove_stalled(&sender, &mut ready); + } + } + } + + removed + } + + /// Returns a transaction if it's part of the pool or `None` otherwise. + pub fn find(&self, hash: &T::Hash) -> Option> { + self.by_hash.get(hash).map(|t| t.transaction.clone()) + } + + /// Returns worst transaction in the queue (if any). + pub fn worst_transaction(&self) -> Option> { + self.worst_transactions + .iter() + .next_back() + .map(|x| x.transaction.transaction.clone()) + } + + /// Returns true if the pool is at it's capacity. + pub fn is_full(&self) -> bool { + self.by_hash.len() >= self.options.max_count || self.mem_usage >= self.options.max_mem_usage + } + + /// Returns senders ordered by priority of their transactions. + pub fn senders(&self) -> impl Iterator { + self.best_transactions + .iter() + .map(|tx| tx.transaction.sender()) + } + + /// Returns an iterator of pending (ready) transactions. + pub fn pending>( + &self, + ready: R, + includable_boundary: S::Score, + ) -> PendingIterator { + PendingIterator { + ready, + best_transactions: self.best_transactions.clone(), + pool: self, + includable_boundary, + } + } + + /// Returns pending (ready) transactions from given sender. + pub fn pending_from_sender>( + &self, + ready: R, + sender: &T::Sender, + includable_boundary: S::Score, + ) -> PendingIterator { + let best_transactions = self + .transactions + .get(sender) + .and_then(|transactions| transactions.worst_and_best()) + .map(|(_, best)| ScoreWithRef::new(best.0, best.1)) + .map(|s| { + let mut set = BTreeSet::new(); + set.insert(s); + set + }) + .unwrap_or_default(); + + PendingIterator { + ready, + best_transactions, + pool: self, + includable_boundary, + } + } + + /// Returns unprioritized list of ready transactions. + pub fn unordered_pending>( + &self, + ready: R, + includable_boundary: S::Score, + ) -> UnorderedIterator { + UnorderedIterator { + ready, + senders: self.transactions.iter(), + transactions: None, + scores: None, + includable_boundary, + } + } + + /// Update score of transactions of a particular sender. + pub fn update_scores(&mut self, sender: &T::Sender, event: S::Event) { + let res = if let Some(set) = self.transactions.get_mut(sender) { + let prev = set.worst_and_best(); + set.update_scores(&self.scoring, event); + let current = set.worst_and_best(); + Some((prev, current)) + } else { + None + }; + + if let Some((prev, current)) = res { + self.update_senders_worst_and_best(prev, current); + } + } + + /// Update score of transactions of all senders + fn update_all_scores(&mut self, event: S::Event) { + let senders = self.transactions.keys().cloned().collect::>(); + for sender in senders { + self.update_scores(&sender, event); + } + } + + /// Computes the full status of the pool (including readiness). + pub fn status>(&self, mut ready: R) -> Status { + let mut status = Status::default(); + + for (_sender, transactions) in &self.transactions { + let len = transactions.len(); + for (idx, tx) in transactions.iter_transactions().enumerate() { + match ready.is_ready(tx) { + Readiness::Stale => status.stalled += 1, + Readiness::Ready => status.pending += 1, + Readiness::Future => { + status.future += len - idx; + break; + } + } + } + } + + status + } + + /// Returns light status of the pool. + pub fn light_status(&self) -> LightStatus { + LightStatus { + mem_usage: self.mem_usage, + transaction_count: self.by_hash.len(), + senders: self.transactions.len(), + } + } + + /// Returns current pool options. + pub fn options(&self) -> Options { + self.options.clone() + } + + /// Borrows listener instance. + pub fn listener(&self) -> &L { + &self.listener + } + + /// Borrows scoring instance. + pub fn scoring(&self) -> &S { + &self.scoring + } + + /// Set scoring instance. + pub fn set_scoring(&mut self, scoring: S, event: S::Event) { + self.scoring = scoring; + self.update_all_scores(event); + } + + /// Borrows listener mutably. + pub fn listener_mut(&mut self) -> &mut L { + &mut self.listener + } +} + +/// An iterator over all pending (ready) transactions in unoredered fashion. +/// +/// NOTE: Current implementation will iterate over all transactions from particular sender +/// ordered by nonce, but that might change in the future. +/// +/// NOTE: the transactions are not removed from the queue. +/// You might remove them later by calling `cull`. +/// +/// Note: includable_boundary is used to return only the subgroup of pending transaction with score greater or equal +/// to includable_boundary. If not needed, set includable_boundary to zero. +pub struct UnorderedIterator<'a, T, R, S> +where + T: VerifiedTransaction + 'a, + S: Scoring + 'a, +{ + ready: R, + senders: hash_map::Iter<'a, T::Sender, Transactions>, + transactions: Option>>, + scores: Option>, + includable_boundary: S::Score, +} + +impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> +where + T: VerifiedTransaction, + R: Ready, + S: Scoring, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + // iterate through each sender + loop { + if let Some(transactions) = self.transactions.as_mut() { + if let Some(scores) = self.scores.as_mut() { + // iterate through each transaction from one sender + loop { + if let Some(tx) = transactions.next() { + if let Some(score) = scores.next() { + match self.ready.is_ready(&tx) { + Readiness::Ready => { + //return transaction with score higher or equal to desired + if score >= &self.includable_boundary { + return Some(tx.transaction.clone()); + } + } + state => { + trace!( + "[{:?}] Ignoring {:?} transaction.", + tx.hash(), + state + ) + } + } + } + } else { + break; + } + } + } + } + + // otherwise fallback and try next sender + let next_sender = self.senders.next()?; + self.transactions = Some(next_sender.1.iter_transactions()); + self.scores = Some(next_sender.1.iter_scores()); + } + } +} + +/// An iterator over all pending (ready) transactions. +/// NOTE: the transactions are not removed from the queue. +/// You might remove them later by calling `cull`. +/// +/// Note: includable_boundary is used to return only the subgroup of pending transaction with score greater or equal +/// to includable_boundary. If not needed, set includable_boundary to zero. +pub struct PendingIterator<'a, T, R, S, L> +where + T: VerifiedTransaction + 'a, + S: Scoring + 'a, + L: 'a, +{ + ready: R, + best_transactions: BTreeSet>, + pool: &'a Pool, + includable_boundary: S::Score, +} + +impl<'a, T, R, S, L> Iterator for PendingIterator<'a, T, R, S, L> +where + T: VerifiedTransaction, + R: Ready, + S: Scoring, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + while !self.best_transactions.is_empty() { + let best = { + let best = self + .best_transactions + .iter() + .next() + .expect("current_best is not empty; qed") + .clone(); + self.best_transactions + .take(&best) + .expect("Just taken from iterator; qed") + }; + + let tx_state = self.ready.is_ready(&best.transaction); + // Add the next best sender's transaction when applicable + match tx_state { + Readiness::Ready | Readiness::Stale => { + // retrieve next one from the same sender. + let next = self + .pool + .transactions + .get(best.transaction.sender()) + .and_then(|s| s.find_next(&best.transaction, &self.pool.scoring)); + if let Some((score, tx)) = next { + self.best_transactions.insert(ScoreWithRef::new(score, tx)); + } + } + _ => (), + } + + if tx_state == Readiness::Ready { + //return transaction with score higher or equal to desired + if best.score >= self.includable_boundary { + return Some(best.transaction.transaction); + } + } + + trace!( + "[{:?}] Ignoring {:?} transaction. Score: {:?}, includable boundary: {:?}", + best.transaction.hash(), + tx_state, + best.score, + self.includable_boundary + ); + } + + None + } +} diff --git a/src/ready.rs b/src/ready.rs new file mode 100644 index 0000000..aa5baee --- /dev/null +++ b/src/ready.rs @@ -0,0 +1,58 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Transaction readiness. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Readiness { + /// The transaction is stale (and should/will be removed from the pool). + Stale, + /// The transaction is ready to be included in pending set. + Ready, + /// The transaction is not yet ready. + Future, +} + +/// A readiness indicator. +pub trait Ready { + /// Returns true if transaction is ready to be included in pending block, + /// given all previous transactions that were ready are already included. + /// + /// NOTE: readiness of transactions will be checked according to `Score` ordering, + /// the implementation should maintain a state of already checked transactions. + fn is_ready(&mut self, tx: &T) -> Readiness; +} + +impl Ready for F +where + F: FnMut(&T) -> Readiness, +{ + fn is_ready(&mut self, tx: &T) -> Readiness { + (*self)(tx) + } +} + +impl Ready for (A, B) +where + A: Ready, + B: Ready, +{ + fn is_ready(&mut self, tx: &T) -> Readiness { + match self.0.is_ready(tx) { + Readiness::Ready => self.1.is_ready(tx), + r => r, + } + } +} diff --git a/src/replace.rs b/src/replace.rs new file mode 100644 index 0000000..daed28f --- /dev/null +++ b/src/replace.rs @@ -0,0 +1,55 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! When queue limits are reached, decide whether to replace an existing transaction from the pool + +use crate::{pool::Transaction, scoring::Choice}; + +/// Encapsulates a transaction to be compared, along with pooled transactions from the same sender +pub struct ReplaceTransaction<'a, T> { + /// The transaction to be compared for replacement + pub transaction: &'a Transaction, + /// Other transactions currently in the pool for the same sender + pub pooled_by_sender: Option<&'a [Transaction]>, +} + +impl<'a, T> ReplaceTransaction<'a, T> { + /// Creates a new `ReplaceTransaction` + pub fn new( + transaction: &'a Transaction, + pooled_by_sender: Option<&'a [Transaction]>, + ) -> Self { + ReplaceTransaction { + transaction, + pooled_by_sender, + } + } +} + +impl<'a, T> ::std::ops::Deref for ReplaceTransaction<'a, T> { + type Target = Transaction; + fn deref(&self) -> &Self::Target { + &self.transaction + } +} + +/// Chooses whether a new transaction should replace an existing transaction if the pool is full. +pub trait ShouldReplace { + /// Decides if `new` should push out `old` transaction from the pool. + /// + /// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits. + fn should_replace(&self, old: &ReplaceTransaction, new: &ReplaceTransaction) -> Choice; +} diff --git a/src/scoring.rs b/src/scoring.rs new file mode 100644 index 0000000..177cc1e --- /dev/null +++ b/src/scoring.rs @@ -0,0 +1,189 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! A transactions ordering abstraction. + +use crate::pool::Transaction; +use std::{cmp, fmt}; + +/// Represents a decision what to do with +/// a new transaction that tries to enter the pool. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Choice { + /// New transaction should be rejected + /// (i.e. the old transaction that occupies the same spot + /// is better). + RejectNew, + /// The old transaction should be dropped + /// in favour of the new one. + ReplaceOld, + /// The new transaction should be inserted + /// and both (old and new) should stay in the pool. + InsertNew, +} + +/// Describes a reason why the `Score` of transactions +/// should be updated. +/// The `Scoring` implementations can use this information +/// to update the `Score` table more efficiently. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Change { + /// New transaction has been inserted at given index. + /// The Score at that index is initialized with default value + /// and needs to be filled in. + InsertedAt(usize), + /// The transaction has been removed at given index and other transactions + /// shifted to it's place. + /// The scores were removed and shifted as well. + /// For simple scoring algorithms no action is required here. + RemovedAt(usize), + /// The transaction at given index has replaced a previous transaction. + /// The score at that index needs to be update (it contains value from previous transaction). + ReplacedAt(usize), + /// Given number of stalled transactions has been culled from the beginning. + /// The scores has been removed from the beginning as well. + /// For simple scoring algorithms no action is required here. + Culled(usize), + /// Custom event to update the score triggered outside of the pool. + /// Handling this event is up to scoring implementation. + Event(T), +} + +/// A transaction ordering. +/// +/// The implementation should decide on order of transactions in the pool. +/// Each transaction should also get assigned a `Score` which is used to later +/// prioritize transactions in the pending set. +/// +/// Implementation notes: +/// - Returned `Score`s should match ordering of `compare` method. +/// - `compare` will be called only within a context of transactions from the same sender. +/// - `choose` may be called even if `compare` returns `Ordering::Equal` +/// - `Score`s and `compare` should align with `Ready` implementation. +/// +/// Example: Natural ordering of Ethereum transactions. +/// - `compare`: compares transaction `nonce` () +/// - `choose`: compares transactions `gasPrice` (decides if old transaction should be replaced) +/// - `update_scores`: score defined as `gasPrice` if `n==0` and `max(scores[n-1], gasPrice)` if `n>0` +/// +pub trait Scoring: fmt::Debug { + /// A score of a transaction. + type Score: cmp::Ord + Clone + Default + fmt::Debug + Send + fmt::LowerHex; + /// Custom scoring update event type. + type Event: fmt::Debug + Copy; + + /// Decides on ordering of `T`s from a particular sender. + fn compare(&self, old: &T, other: &T) -> cmp::Ordering; + + /// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue. + fn choose(&self, old: &T, new: &T) -> Choice; + + /// Updates the transaction scores given a list of transactions and a change to previous scoring. + /// NOTE: you can safely assume that both slices have the same length. + /// (i.e. score at index `i` represents transaction at the same index) + fn update_scores( + &self, + txs: &[Transaction], + scores: &mut [Self::Score], + change: Change, + ); + + /// Decides if the transaction should ignore per-sender limit in the pool. + /// + /// If you return `true` for given transaction it's going to be accepted even though + /// the per-sender limit is exceeded. + fn should_ignore_sender_limit(&self, _new: &T) -> bool { + false + } +} + +/// A score with a reference to the transaction. +#[derive(Debug)] +pub struct ScoreWithRef { + /// Score + pub score: S, + /// Shared transaction + pub transaction: Transaction, +} + +impl ScoreWithRef { + /// Creates a new `ScoreWithRef` + pub fn new(score: S, transaction: Transaction) -> Self { + ScoreWithRef { score, transaction } + } +} + +impl Clone for ScoreWithRef { + fn clone(&self) -> Self { + ScoreWithRef { + score: self.score.clone(), + transaction: self.transaction.clone(), + } + } +} + +impl Ord for ScoreWithRef { + fn cmp(&self, other: &Self) -> cmp::Ordering { + other.score.cmp(&self.score).then( + self.transaction + .insertion_id + .cmp(&other.transaction.insertion_id), + ) + } +} + +impl PartialOrd for ScoreWithRef { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for ScoreWithRef { + fn eq(&self, other: &Self) -> bool { + self.score == other.score && self.transaction.insertion_id == other.transaction.insertion_id + } +} + +impl Eq for ScoreWithRef {} + +#[cfg(test)] +mod tests { + use super::*; + + fn score(score: u64, insertion_id: u64) -> ScoreWithRef<(), u64> { + ScoreWithRef { + score, + transaction: Transaction { + insertion_id, + transaction: Default::default(), + }, + } + } + + #[test] + fn scoring_comparison() { + // the higher the score the better + assert_eq!(score(10, 0).cmp(&score(0, 0)), cmp::Ordering::Less); + assert_eq!(score(0, 0).cmp(&score(10, 0)), cmp::Ordering::Greater); + + // equal is equal + assert_eq!(score(0, 0).cmp(&score(0, 0)), cmp::Ordering::Equal); + + // lower insertion id is better + assert_eq!(score(0, 0).cmp(&score(0, 10)), cmp::Ordering::Less); + assert_eq!(score(0, 10).cmp(&score(0, 0)), cmp::Ordering::Greater); + } +} diff --git a/src/status.rs b/src/status.rs new file mode 100644 index 0000000..b423ee5 --- /dev/null +++ b/src/status.rs @@ -0,0 +1,40 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Light pool status. +/// This status is cheap to compute and can be called frequently. +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct LightStatus { + /// Memory usage in bytes. + pub mem_usage: usize, + /// Total number of transactions in the pool. + pub transaction_count: usize, + /// Number of unique senders in the pool. + pub senders: usize, +} + +/// A full queue status. +/// To compute this status it is required to provide `Ready`. +/// NOTE: To compute the status we need to visit each transaction in the pool. +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Status { + /// Number of stalled transactions. + pub stalled: usize, + /// Number of pending (ready) transactions. + pub pending: usize, + /// Number of future (not ready) transactions. + pub future: usize, +} diff --git a/src/tests/helpers.rs b/src/tests/helpers.rs new file mode 100644 index 0000000..9a4291e --- /dev/null +++ b/src/tests/helpers.rs @@ -0,0 +1,140 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::{cmp, collections::HashMap}; + +use super::Transaction; +use crate::{pool, scoring, Readiness, Ready, ReplaceTransaction, Scoring, ShouldReplace}; +use ethereum_types::{H160 as Sender, U256}; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum DummyScoringEvent { + /// Penalize transactions + Penalize, + /// Update scores to the gas price + UpdateScores, +} +#[derive(Debug, Default)] +pub struct DummyScoring { + always_insert: bool, +} + +impl DummyScoring { + pub fn always_insert() -> Self { + DummyScoring { + always_insert: true, + } + } +} + +impl Scoring for DummyScoring { + type Score = U256; + type Event = DummyScoringEvent; + + fn compare(&self, old: &Transaction, new: &Transaction) -> cmp::Ordering { + old.nonce.cmp(&new.nonce) + } + + fn choose(&self, old: &Transaction, new: &Transaction) -> scoring::Choice { + if old.nonce == new.nonce { + if new.gas_price > old.gas_price { + scoring::Choice::ReplaceOld + } else { + scoring::Choice::RejectNew + } + } else { + scoring::Choice::InsertNew + } + } + + fn update_scores( + &self, + txs: &[pool::Transaction], + scores: &mut [Self::Score], + change: scoring::Change, + ) { + match change { + scoring::Change::Event(event) => { + match event { + DummyScoringEvent::Penalize => { + println!("entered"); + // In case of penalize reset all scores to 0 + for i in 0..txs.len() { + scores[i] = 0.into(); + } + } + DummyScoringEvent::UpdateScores => { + // Set to a gas price otherwise + for i in 0..txs.len() { + scores[i] = txs[i].gas_price; + } + } + } + } + scoring::Change::InsertedAt(index) | scoring::Change::ReplacedAt(index) => { + scores[index] = txs[index].gas_price; + } + scoring::Change::RemovedAt(_) => {} + scoring::Change::Culled(_) => {} + } + } + + fn should_ignore_sender_limit(&self, _new: &Transaction) -> bool { + self.always_insert + } +} + +impl ShouldReplace for DummyScoring { + fn should_replace( + &self, + old: &ReplaceTransaction, + new: &ReplaceTransaction, + ) -> scoring::Choice { + if self.always_insert { + scoring::Choice::InsertNew + } else if new.gas_price > old.gas_price { + scoring::Choice::ReplaceOld + } else { + scoring::Choice::RejectNew + } + } +} + +#[derive(Default)] +pub struct NonceReady(HashMap, U256); + +impl NonceReady { + pub fn new>(min: T) -> Self { + let mut n = NonceReady::default(); + n.1 = min.into(); + n + } +} + +impl Ready for NonceReady { + fn is_ready(&mut self, tx: &Transaction) -> Readiness { + let min = self.1; + let nonce = self.0.entry(tx.sender).or_insert_with(|| min); + match tx.nonce.cmp(nonce) { + cmp::Ordering::Greater => Readiness::Future, + cmp::Ordering::Equal => { + *nonce += 1.into(); + Readiness::Ready + } + cmp::Ordering::Less => Readiness::Stale, + } + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 0000000..5ba235d --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1,961 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod helpers; +mod tx_builder; + +use self::{ + helpers::{DummyScoring, NonceReady}, + tx_builder::TransactionBuilder, +}; + +use std::sync::Arc; + +use super::*; +use ethereum_types::{Address, H256, U256}; + +#[derive(Debug, PartialEq)] +pub struct Transaction { + pub hash: H256, + pub nonce: U256, + pub gas_price: U256, + pub gas: U256, + pub sender: Address, + pub mem_usage: usize, +} + +impl VerifiedTransaction for Transaction { + type Hash = H256; + type Sender = Address; + + fn hash(&self) -> &H256 { + &self.hash + } + fn mem_usage(&self) -> usize { + self.mem_usage + } + fn sender(&self) -> &Address { + &self.sender + } +} + +pub type SharedTransaction = Arc; + +type TestPool = Pool; + +impl TestPool { + pub fn with_limit(max_count: usize) -> Self { + Self::with_options(Options { + max_count, + ..Default::default() + }) + } +} + +fn import, L: Listener>( + txq: &mut Pool, + tx: Transaction, +) -> Result, Error<::Hash>> { + txq.import(tx, &mut DummyScoring::default()) +} + +#[test] +fn should_clear_queue() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + assert_eq!( + txq.light_status(), + LightStatus { + mem_usage: 0, + transaction_count: 0, + senders: 0, + } + ); + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(1).mem_usage(1).new(); + + // add + import(&mut txq, tx1).unwrap(); + import(&mut txq, tx2).unwrap(); + assert_eq!( + txq.light_status(), + LightStatus { + mem_usage: 1, + transaction_count: 2, + senders: 1, + } + ); + + // when + txq.clear(); + + // then + assert_eq!( + txq.light_status(), + LightStatus { + mem_usage: 0, + transaction_count: 0, + senders: 0, + } + ); +} + +#[test] +fn should_not_allow_same_transaction_twice() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(0).new(); + + // when + import(&mut txq, tx1).unwrap(); + import(&mut txq, tx2).unwrap_err(); + + // then + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_replace_transaction() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + let tx1 = b.tx().nonce(0).gas_price(1).new(); + let tx2 = b.tx().nonce(0).gas_price(2).new(); + + // when + import(&mut txq, tx1).unwrap(); + import(&mut txq, tx2).unwrap(); + + // then + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_reject_if_above_count() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_options(Options { + max_count: 1, + ..Default::default() + }); + + // Reject second + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(1).new(); + let hash = tx2.hash.clone(); + import(&mut txq, tx1).unwrap(); + assert_eq!( + import(&mut txq, tx2).unwrap_err(), + error::Error::TooCheapToEnter(hash, "0x0".into()) + ); + assert_eq!(txq.light_status().transaction_count, 1); + + txq.clear(); + + // Replace first + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(0).sender(1).gas_price(2).new(); + import(&mut txq, tx1).unwrap(); + import(&mut txq, tx2).unwrap(); + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_reject_if_above_mem_usage() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_options(Options { + max_mem_usage: 1, + ..Default::default() + }); + + // Reject second + let tx1 = b.tx().nonce(1).mem_usage(1).new(); + let tx2 = b.tx().nonce(2).mem_usage(2).new(); + let hash = tx2.hash.clone(); + import(&mut txq, tx1).unwrap(); + assert_eq!( + import(&mut txq, tx2).unwrap_err(), + error::Error::TooCheapToEnter(hash, "0x0".into()) + ); + assert_eq!(txq.light_status().transaction_count, 1); + + txq.clear(); + + // Replace first + let tx1 = b.tx().nonce(1).mem_usage(1).new(); + let tx2 = b.tx().nonce(1).sender(1).gas_price(2).mem_usage(1).new(); + import(&mut txq, tx1).unwrap(); + import(&mut txq, tx2).unwrap(); + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_reject_if_above_sender_count() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_options(Options { + max_per_sender: 1, + ..Default::default() + }); + + // Reject second + let tx1 = b.tx().nonce(1).new(); + let tx2 = b.tx().nonce(2).new(); + let hash = tx2.hash.clone(); + import(&mut txq, tx1).unwrap(); + assert_eq!( + import(&mut txq, tx2).unwrap_err(), + error::Error::TooCheapToEnter(hash, "0x0".into()) + ); + assert_eq!(txq.light_status().transaction_count, 1); + + txq.clear(); + + // Replace first + let tx1 = b.tx().nonce(1).new(); + let tx2 = b.tx().nonce(2).gas_price(2).new(); + let hash = tx2.hash.clone(); + import(&mut txq, tx1).unwrap(); + // This results in error because we also compare nonces + assert_eq!( + import(&mut txq, tx2).unwrap_err(), + error::Error::TooCheapToEnter(hash, "0x0".into()) + ); + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_construct_pending() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx0 = import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = import(&mut txq, b.tx().nonce(1).gas_price(5).new()).unwrap(); + + let tx9 = import(&mut txq, b.tx().sender(2).nonce(0).new()).unwrap(); + + let tx5 = import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = import(&mut txq, b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = import(&mut txq, b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = import(&mut txq, b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + + let tx2 = import(&mut txq, b.tx().nonce(2).new()).unwrap(); + // this transaction doesn't get to the block despite high gas price + // because of block gas limit and simplistic ordering algorithm. + let tx3 = import(&mut txq, b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + import(&mut txq, b.tx().nonce(5).new()).unwrap(); + + // gap + import(&mut txq, b.tx().sender(1).nonce(5).new()).unwrap(); + + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!( + txq.status(NonceReady::default()), + Status { + stalled: 0, + pending: 9, + future: 2, + } + ); + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 3, + pending: 6, + future: 2, + } + ); + + // get only includable part of the pending transactions + let mut includable = txq.pending(NonceReady::default(), U256::from(4)); + + assert_eq!(includable.next(), Some(tx0.clone())); + assert_eq!(includable.next(), Some(tx1.clone())); + assert_eq!(includable.next(), Some(tx8.clone())); + assert_eq!(includable.next(), Some(tx3.clone())); + assert_eq!(includable.next(), None); + + // get all pending transactions + let mut current_gas = U256::zero(); + let limit = (21_000 * 8).into(); + let mut pending = txq + .pending(NonceReady::default(), U256::from(0)) + .take_while(|tx| { + let should_take = tx.gas + current_gas <= limit; + if should_take { + current_gas = current_gas + tx.gas + } + should_take + }); + + assert_eq!(pending.next(), Some(tx0)); + assert_eq!(pending.next(), Some(tx1)); + assert_eq!(pending.next(), Some(tx9)); + assert_eq!(pending.next(), Some(tx5)); + assert_eq!(pending.next(), Some(tx6)); + assert_eq!(pending.next(), Some(tx7)); + assert_eq!(pending.next(), Some(tx8)); + assert_eq!(pending.next(), Some(tx2)); + assert_eq!(pending.next(), None); +} + +#[test] +fn should_skip_staled_pending_transactions() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let _tx0 = import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx2 = import(&mut txq, b.tx().nonce(2).gas_price(5).new()).unwrap(); + let _tx1 = import(&mut txq, b.tx().nonce(1).gas_price(5).new()).unwrap(); + + // tx0 and tx1 are Stale, tx2 is Ready + let mut pending = txq.pending(NonceReady::new(2), Default::default()); + + // tx0 and tx1 should be skipped, tx2 should be the next Ready + assert_eq!(pending.next(), Some(tx2)); + assert_eq!(pending.next(), None); +} + +#[test] +fn should_return_unordered_iterator() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx0 = import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = import(&mut txq, b.tx().nonce(1).gas_price(5).new()).unwrap(); + let tx2 = import(&mut txq, b.tx().nonce(2).new()).unwrap(); + let tx3 = import(&mut txq, b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + import(&mut txq, b.tx().nonce(5).new()).unwrap(); + + let tx5 = import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = import(&mut txq, b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = import(&mut txq, b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = import(&mut txq, b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + // gap + import(&mut txq, b.tx().sender(1).nonce(5).new()).unwrap(); + + let tx9 = import(&mut txq, b.tx().sender(2).nonce(0).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!( + txq.status(NonceReady::default()), + Status { + stalled: 0, + pending: 9, + future: 2, + } + ); + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 3, + pending: 6, + future: 2, + } + ); + + // get all pending transaction in unordered way + let all: Vec<_> = txq + .unordered_pending(NonceReady::default(), Default::default()) + .collect(); + + let chain1 = vec![tx0, tx1, tx2, tx3]; + let chain2 = vec![tx5, tx6, tx7, tx8]; + let chain3 = vec![tx9]; + + assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len()); + + let mut options = vec![ + vec![chain1.clone(), chain2.clone(), chain3.clone()], + vec![chain2.clone(), chain1.clone(), chain3.clone()], + vec![chain2.clone(), chain3.clone(), chain1.clone()], + vec![chain3.clone(), chain2.clone(), chain1.clone()], + vec![chain3.clone(), chain1.clone(), chain2.clone()], + vec![chain1.clone(), chain3.clone(), chain2.clone()], + ] + .into_iter() + .map(|mut v| { + let mut first = v.pop().unwrap(); + for mut x in v { + first.append(&mut x); + } + first + }); + + assert!(options.any(|opt| all == opt)); + + // get only includable part of the pending transactions in unordered way + let includable: Vec<_> = txq + .unordered_pending(NonceReady::default(), U256::from(3)) + .collect(); + + assert_eq!(includable.len(), 4); +} + +#[test] +fn should_update_scoring_correctly() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx9 = import(&mut txq, b.tx().sender(2).nonce(0).new()).unwrap(); + + let tx5 = import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = import(&mut txq, b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = import(&mut txq, b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = import(&mut txq, b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + + let tx0 = import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = import(&mut txq, b.tx().nonce(1).gas_price(5).new()).unwrap(); + let tx2 = import(&mut txq, b.tx().nonce(2).new()).unwrap(); + // this transaction doesn't get to the block despite high gas price + // because of block gas limit and simplistic ordering algorithm. + import(&mut txq, b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + import(&mut txq, b.tx().nonce(5).new()).unwrap(); + + // gap + import(&mut txq, b.tx().sender(1).nonce(5).new()).unwrap(); + + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!( + txq.status(NonceReady::default()), + Status { + stalled: 0, + pending: 9, + future: 2, + } + ); + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 3, + pending: 6, + future: 2, + } + ); + + txq.update_scores(&Address::zero(), helpers::DummyScoringEvent::Penalize); + + // when + let mut current_gas = U256::zero(); + let limit = (21_000 * 8).into(); + let mut pending = txq + .pending(NonceReady::default(), Default::default()) + .take_while(|tx| { + let should_take = tx.gas + current_gas <= limit; + if should_take { + current_gas = current_gas + tx.gas + } + should_take + }); + + assert_eq!(pending.next(), Some(tx9)); + assert_eq!(pending.next(), Some(tx5)); + assert_eq!(pending.next(), Some(tx6)); + assert_eq!(pending.next(), Some(tx7)); + assert_eq!(pending.next(), Some(tx8)); + // penalized transactions + assert_eq!(pending.next(), Some(tx0.clone())); + assert_eq!(pending.next(), Some(tx1.clone())); + assert_eq!(pending.next(), Some(tx2)); + assert_eq!(pending.next(), None); + + // update scores to initial values + txq.set_scoring( + DummyScoring::default(), + helpers::DummyScoringEvent::UpdateScores, + ); + + current_gas = U256::zero(); + let mut includable = txq + .pending(NonceReady::default(), Default::default()) + .take_while(|tx| { + let should_take = tx.gas + current_gas <= limit; + if should_take { + current_gas = current_gas + tx.gas + } + should_take + }); + + assert_eq!(includable.next(), Some(tx0)); + assert_eq!(includable.next(), Some(tx1)); +} + +#[test] +fn should_remove_transaction() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx1 = import(&mut txq, b.tx().nonce(0).new()).unwrap(); + let tx2 = import(&mut txq, b.tx().nonce(1).new()).unwrap(); + import(&mut txq, b.tx().nonce(2).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 3); + + // when + assert!(txq.remove(&tx2.hash(), false).is_some()); + + // then + assert_eq!(txq.light_status().transaction_count, 2); + let mut pending = txq.pending(NonceReady::default(), Default::default()); + assert_eq!(pending.next(), Some(tx1)); + assert_eq!(pending.next(), None); +} + +#[test] +fn should_cull_stalled_transactions() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + import(&mut txq, b.tx().nonce(1).new()).unwrap(); + import(&mut txq, b.tx().nonce(3).new()).unwrap(); + + import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(1).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(5).new()).unwrap(); + + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 2, + pending: 2, + future: 2, + } + ); + + // when + assert_eq!(txq.cull(None, NonceReady::new(1)), 2); + + // then + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 0, + pending: 2, + future: 2, + } + ); + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 4, + senders: 2, + mem_usage: 0, + } + ); +} + +#[test] +fn should_cull_stalled_transactions_from_a_sender() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + import(&mut txq, b.tx().nonce(1).new()).unwrap(); + + import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(1).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(2).new()).unwrap(); + + assert_eq!( + txq.status(NonceReady::new(2)), + Status { + stalled: 4, + pending: 1, + future: 0, + } + ); + + // when + let sender = Address::zero(); + assert_eq!(txq.cull(Some(&[sender]), NonceReady::new(2)), 2); + + // then + assert_eq!( + txq.status(NonceReady::new(2)), + Status { + stalled: 2, + pending: 1, + future: 0, + } + ); + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 3, + senders: 1, + mem_usage: 0, + } + ); +} + +#[test] +fn should_re_insert_after_cull() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + import(&mut txq, b.tx().nonce(1).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(1).new()).unwrap(); + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 2, + pending: 2, + future: 0, + } + ); + + // when + assert_eq!(txq.cull(None, NonceReady::new(1)), 2); + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 0, + pending: 2, + future: 0, + } + ); + import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(0).new()).unwrap(); + + assert_eq!( + txq.status(NonceReady::new(1)), + Status { + stalled: 2, + pending: 2, + future: 0, + } + ); +} + +#[test] +fn should_return_worst_transaction() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + assert!(txq.worst_transaction().is_none()); + + // when + import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + import(&mut txq, b.tx().sender(1).nonce(0).gas_price(4).new()).unwrap(); + + // then + assert_eq!(txq.worst_transaction().unwrap().gas_price, 4.into()); +} + +#[test] +fn should_return_is_full() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_limit(2); + assert!(!txq.is_full()); + + // when + import(&mut txq, b.tx().nonce(0).gas_price(110).new()).unwrap(); + assert!(!txq.is_full()); + + import(&mut txq, b.tx().sender(1).nonce(0).gas_price(100).new()).unwrap(); + + // then + assert!(txq.is_full()); +} + +#[test] +fn should_import_even_if_limit_is_reached_and_should_replace_returns_insert_new() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_scoring( + DummyScoring::always_insert(), + Options { + max_count: 1, + ..Default::default() + }, + ); + txq.import( + b.tx().nonce(0).gas_price(5).new(), + &mut DummyScoring::always_insert(), + ) + .unwrap(); + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + } + ); + + // when + txq.import( + b.tx().nonce(1).gas_price(5).new(), + &mut DummyScoring::always_insert(), + ) + .unwrap(); + + // then + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 2, + senders: 1, + mem_usage: 0, + } + ); +} + +#[test] +fn should_not_import_even_if_limit_is_reached_and_should_replace_returns_false() { + use std::str::FromStr; + + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_scoring( + DummyScoring::default(), + Options { + max_count: 1, + ..Default::default() + }, + ); + import(&mut txq, b.tx().nonce(0).gas_price(5).new()).unwrap(); + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + } + ); + + // when + let err = import(&mut txq, b.tx().nonce(1).gas_price(5).new()).unwrap_err(); + + // then + assert_eq!( + err, + error::Error::TooCheapToEnter( + H256::from_str("00000000000000000000000000000000000000000000000000000000000001f5") + .unwrap(), + "0x5".into() + ) + ); + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + } + ); +} + +#[test] +fn should_import_even_if_sender_limit_is_reached() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_scoring( + DummyScoring::always_insert(), + Options { + max_count: 1, + max_per_sender: 1, + ..Default::default() + }, + ); + txq.import( + b.tx().nonce(0).gas_price(5).new(), + &mut DummyScoring::always_insert(), + ) + .unwrap(); + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + } + ); + + // when + txq.import( + b.tx().nonce(1).gas_price(5).new(), + &mut DummyScoring::always_insert(), + ) + .unwrap(); + + // then + assert_eq!( + txq.light_status(), + LightStatus { + transaction_count: 2, + senders: 1, + mem_usage: 0, + } + ); +} + +mod listener { + use std::{cell::RefCell, fmt, rc::Rc}; + + use super::*; + + #[derive(Default)] + struct MyListener(pub Rc>>); + + impl Listener for MyListener { + fn added(&mut self, _tx: &SharedTransaction, old: Option<&SharedTransaction>) { + self.0 + .borrow_mut() + .push(if old.is_some() { "replaced" } else { "added" }); + } + + fn rejected( + &mut self, + _tx: &SharedTransaction, + _reason: &error::Error, + ) { + self.0.borrow_mut().push("rejected".into()); + } + + fn dropped(&mut self, _tx: &SharedTransaction, _new: Option<&Transaction>) { + self.0.borrow_mut().push("dropped".into()); + } + + fn invalid(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("invalid".into()); + } + + fn canceled(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("canceled".into()); + } + + fn culled(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("culled".into()); + } + } + + #[test] + fn insert_transaction() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new( + listener, + DummyScoring::default(), + Options { + max_per_sender: 1, + max_count: 2, + ..Default::default() + }, + ); + assert!(results.borrow().is_empty()); + + // Regular import + import(&mut txq, b.tx().nonce(1).new()).unwrap(); + assert_eq!(*results.borrow(), &["added"]); + // Already present (no notification) + import(&mut txq, b.tx().nonce(1).new()).unwrap_err(); + assert_eq!(*results.borrow(), &["added"]); + // Push out the first one + import(&mut txq, b.tx().nonce(1).gas_price(1).new()).unwrap(); + assert_eq!(*results.borrow(), &["added", "replaced"]); + // Reject + import(&mut txq, b.tx().nonce(1).new()).unwrap_err(); + assert_eq!(*results.borrow(), &["added", "replaced", "rejected"]); + results.borrow_mut().clear(); + // Different sender (accept) + import(&mut txq, b.tx().sender(1).nonce(1).gas_price(2).new()).unwrap(); + assert_eq!(*results.borrow(), &["added"]); + // Third sender push out low gas price + import(&mut txq, b.tx().sender(2).nonce(1).gas_price(4).new()).unwrap(); + assert_eq!(*results.borrow(), &["added", "dropped", "added"]); + // Reject (too cheap) + import(&mut txq, b.tx().sender(2).nonce(1).gas_price(2).new()).unwrap_err(); + assert_eq!( + *results.borrow(), + &["added", "dropped", "added", "rejected"] + ); + + assert_eq!(txq.light_status().transaction_count, 2); + } + + #[test] + fn remove_transaction() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring::default(), Options::default()); + + // insert + let tx1 = import(&mut txq, b.tx().nonce(1).new()).unwrap(); + let tx2 = import(&mut txq, b.tx().nonce(2).new()).unwrap(); + + // then + txq.remove(&tx1.hash(), false); + assert_eq!(*results.borrow(), &["added", "added", "canceled"]); + txq.remove(&tx2.hash(), true); + assert_eq!( + *results.borrow(), + &["added", "added", "canceled", "invalid"] + ); + assert_eq!(txq.light_status().transaction_count, 0); + } + + #[test] + fn clear_queue() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring::default(), Options::default()); + + // insert + import(&mut txq, b.tx().nonce(1).new()).unwrap(); + import(&mut txq, b.tx().nonce(2).new()).unwrap(); + + // when + txq.clear(); + + // then + assert_eq!(*results.borrow(), &["added", "added", "dropped", "dropped"]); + } + + #[test] + fn cull_stalled() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring::default(), Options::default()); + + // insert + import(&mut txq, b.tx().nonce(1).new()).unwrap(); + import(&mut txq, b.tx().nonce(2).new()).unwrap(); + + // when + txq.cull(None, NonceReady::new(3)); + + // then + assert_eq!(*results.borrow(), &["added", "added", "culled", "culled"]); + } +} diff --git a/src/tests/tx_builder.rs b/src/tests/tx_builder.rs new file mode 100644 index 0000000..8d1f2f4 --- /dev/null +++ b/src/tests/tx_builder.rs @@ -0,0 +1,67 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use super::{Address, Transaction, H256, U256}; +use ethereum_types::BigEndianHash; + +#[derive(Debug, Default, Clone)] +pub struct TransactionBuilder { + nonce: U256, + gas_price: U256, + gas: U256, + sender: Address, + mem_usage: usize, +} + +impl TransactionBuilder { + pub fn tx(&self) -> Self { + self.clone() + } + + pub fn nonce(mut self, nonce: usize) -> Self { + self.nonce = U256::from(nonce); + self + } + + pub fn gas_price(mut self, gas_price: usize) -> Self { + self.gas_price = U256::from(gas_price); + self + } + + pub fn sender(mut self, sender: u64) -> Self { + self.sender = Address::from_low_u64_be(sender); + self + } + + pub fn mem_usage(mut self, mem_usage: usize) -> Self { + self.mem_usage = mem_usage; + self + } + + pub fn new(self) -> Transaction { + let hash: U256 = self.nonce + ^ (U256::from(100) * self.gas_price) + ^ (U256::from(100_000) * U256::from(self.sender.to_low_u64_be())); + Transaction { + hash: H256::from_uint(&hash), + nonce: self.nonce, + gas_price: self.gas_price, + gas: 21_000.into(), + sender: self.sender, + mem_usage: self.mem_usage, + } + } +} diff --git a/src/transactions.rs b/src/transactions.rs new file mode 100644 index 0000000..b194fc9 --- /dev/null +++ b/src/transactions.rs @@ -0,0 +1,268 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::{fmt, mem}; + +use log::warn; +use smallvec::SmallVec; + +use crate::{ + pool::Transaction, + ready::{Readiness, Ready}, + scoring::{self, Scoring}, +}; + +#[derive(Debug)] +pub enum AddResult { + Ok(T), + TooCheapToEnter(T, S), + TooCheap { old: T, new: T }, + Replaced { old: T, new: T }, + PushedOut { old: T, new: T }, +} + +/// Represents all transactions from a particular sender ordered by nonce. +const PER_SENDER: usize = 8; +#[derive(Debug)] +pub struct Transactions> { + // TODO [ToDr] Consider using something that doesn't require shifting all records. + transactions: SmallVec<[Transaction; PER_SENDER]>, + scores: SmallVec<[S::Score; PER_SENDER]>, +} + +impl> Default for Transactions { + fn default() -> Self { + Transactions { + transactions: Default::default(), + scores: Default::default(), + } + } +} + +impl> Transactions { + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } + + pub fn len(&self) -> usize { + self.transactions.len() + } + + pub fn iter_transactions(&self) -> ::std::slice::Iter> { + self.transactions.iter() + } + + pub fn iter_scores(&self) -> ::std::slice::Iter { + self.scores.iter() + } + + pub fn worst_and_best( + &self, + ) -> Option<((S::Score, Transaction), (S::Score, Transaction))> { + let len = self.scores.len(); + self.scores.get(0).cloned().map(|best| { + let worst = self.scores[len - 1].clone(); + let best_tx = self.transactions[0].clone(); + let worst_tx = self.transactions[len - 1].clone(); + + ((worst, worst_tx), (best, best_tx)) + }) + } + + pub fn find_next(&self, tx: &T, scoring: &S) -> Option<(S::Score, Transaction)> { + self.transactions + .binary_search_by(|old| scoring.compare(old, &tx)) + .ok() + .and_then(|index| { + let index = index + 1; + if index < self.scores.len() { + Some((self.scores[index].clone(), self.transactions[index].clone())) + } else { + None + } + }) + } + + fn push_cheapest_transaction( + &mut self, + tx: Transaction, + scoring: &S, + max_count: usize, + ) -> AddResult, S::Score> { + let index = self.transactions.len(); + if index == max_count && !scoring.should_ignore_sender_limit(&tx) { + let min_score = self.scores[index - 1].clone(); + AddResult::TooCheapToEnter(tx, min_score) + } else { + self.transactions.push(tx.clone()); + self.scores.push(Default::default()); + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::InsertedAt(index), + ); + + AddResult::Ok(tx) + } + } + + pub fn update_scores(&mut self, scoring: &S, event: S::Event) { + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::Event(event), + ); + } + + pub fn add( + &mut self, + new: Transaction, + scoring: &S, + max_count: usize, + ) -> AddResult, S::Score> { + let index = match self + .transactions + .binary_search_by(|old| scoring.compare(old, &new)) + { + Ok(index) => index, + Err(index) => index, + }; + + // Insert at the end. + if index == self.transactions.len() { + return self.push_cheapest_transaction(new, scoring, max_count); + } + + // Decide if the transaction should replace some other. + match scoring.choose(&self.transactions[index], &new) { + // New transaction should be rejected + scoring::Choice::RejectNew => AddResult::TooCheap { + old: self.transactions[index].clone(), + new, + }, + // New transaction should be kept along with old ones. + scoring::Choice::InsertNew => { + self.transactions.insert(index, new.clone()); + self.scores.insert(index, Default::default()); + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::InsertedAt(index), + ); + + if self.transactions.len() > max_count { + let old = self.transactions.pop().expect("len is non-zero"); + self.scores.pop(); + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::RemovedAt(self.transactions.len()), + ); + + AddResult::PushedOut { old, new } + } else { + AddResult::Ok(new) + } + } + // New transaction is replacing some other transaction already in the queue. + scoring::Choice::ReplaceOld => { + let old = mem::replace(&mut self.transactions[index], new.clone()); + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::ReplacedAt(index), + ); + + AddResult::Replaced { old, new } + } + } + } + + pub fn remove(&mut self, tx: &T, scoring: &S) -> bool { + let index = match self + .transactions + .binary_search_by(|old| scoring.compare(old, tx)) + { + Ok(index) => index, + Err(_) => { + warn!("Attempting to remove non-existent transaction {:?}", tx); + return false; + } + }; + + self.transactions.remove(index); + self.scores.remove(index); + // Update scoring + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::RemovedAt(index), + ); + return true; + } + + pub fn cull>( + &mut self, + ready: &mut R, + scoring: &S, + ) -> SmallVec<[Transaction; PER_SENDER]> { + let mut result = SmallVec::new(); + if self.is_empty() { + return result; + } + + let mut first_non_stalled = 0; + for tx in &self.transactions { + match ready.is_ready(tx) { + Readiness::Stale => { + first_non_stalled += 1; + } + Readiness::Ready | Readiness::Future => break, + } + } + + if first_non_stalled == 0 { + return result; + } + + // reverse the vectors to easily remove first elements. + self.transactions.reverse(); + self.scores.reverse(); + + for _ in 0..first_non_stalled { + self.scores.pop(); + result.push( + self.transactions + .pop() + .expect("first_non_stalled is never greater than transactions.len(); qed"), + ); + } + + self.transactions.reverse(); + self.scores.reverse(); + + // update scoring + scoring.update_scores( + &self.transactions, + &mut self.scores, + scoring::Change::Culled(result.len()), + ); + + // reverse the result to maintain correct order. + result.reverse(); + result + } +} diff --git a/src/verifier.rs b/src/verifier.rs new file mode 100644 index 0000000..a84242b --- /dev/null +++ b/src/verifier.rs @@ -0,0 +1,31 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use crate::VerifiedTransaction; + +/// Transaction verification. +/// +/// Verifier is responsible to decide if the transaction should even be considered for pool inclusion. +pub trait Verifier { + /// Verification error. + type Error; + + /// Verified transaction. + type VerifiedTransaction: VerifiedTransaction; + + /// Verifies a `UnverifiedTransaction` and produces `VerifiedTransaction` instance. + fn verify_transaction(&self, tx: U) -> Result; +}