Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Transaction pool: Ensure that we prune transactions properly (#8963)
Browse files Browse the repository at this point in the history
* Transaction pool: Ensure that we prune transactions properly

There was a bug in the transaction pool that we didn't pruned
transactions properly because we called `prune_known`, instead of `prune`.

This bug was introduced by:
#4629

This is required to have stale extrinsics being removed properly, so
that they don't fill up the tx pool.

* Fix compilation

* Fix benches

* ...
  • Loading branch information
bkchr authored Jun 3, 2021
1 parent d6e4db6 commit ea5d357
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 68 deletions.
9 changes: 8 additions & 1 deletion client/transaction-pool/graph/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sc_transaction_graph::*;
use codec::Encode;
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use sp_runtime::{
generic::BlockId,
generic::BlockId, traits::Block as BlockT,
transaction_validity::{
ValidTransaction, InvalidTransaction, TransactionValidity, TransactionTag as Tag,
TransactionSource,
Expand Down Expand Up @@ -114,6 +114,13 @@ impl ChainApi for TestApi {
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(Ok(None))
}

fn block_header(
&self,
_: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
Ok(None)
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Expand Down
10 changes: 3 additions & 7 deletions client/transaction-pool/graph/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{

use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use log::{debug, trace, warn};
use log::{debug, trace};
use sp_runtime::traits;

use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
Expand Down Expand Up @@ -99,12 +99,8 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
}

/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H, warn: bool) {
if warn {
warn!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
} else {
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
}
pub fn invalid(&mut self, tx: &H) {
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
self.fire(tx, |watcher| watcher.invalid());
}

Expand Down
15 changes: 14 additions & 1 deletion client/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ pub trait ChainApi: Send + Sync {

/// Returns a block body given the block id.
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;

/// Returns a block header given the block id.
fn block_header(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
}

/// Pool configuration options.
Expand Down Expand Up @@ -237,7 +243,7 @@ impl<B: ChainApi> Pool<B> {
) -> Result<(), B::Error> {
// Get details of all extrinsics that are already in the pool
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
.into_iter().filter_map(|x| x).flat_map(|x| x);
.into_iter().filter_map(|x| x).flatten();

// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
Expand Down Expand Up @@ -579,6 +585,13 @@ mod tests {
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(None))
}

fn block_header(
&self,
_: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
Ok(None)
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Expand Down
26 changes: 11 additions & 15 deletions client/transaction-pool/graph/src/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<B: ChainApi> ValidatedPool<B> {
Err(err)
},
ValidatedTransaction::Unknown(hash, err) => {
self.listener.write().invalid(&hash, false);
self.listener.write().invalid(&hash);
Err(err)
},
}
Expand Down Expand Up @@ -415,18 +415,20 @@ impl<B: ChainApi> ValidatedPool<B> {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Dropped => listener.dropped(&hash, None),
Status::Failed => listener.invalid(&hash, initial_status.is_some()),
Status::Failed => listener.invalid(&hash),
}
}
}
}

/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read().by_hashes(&hashes)
self.pool.read()
.by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.to_vec()))
.map(|existing_in_pool|
existing_in_pool.map(|transaction| transaction.provides.to_vec())
)
.collect()
}

Expand Down Expand Up @@ -599,7 +601,7 @@ impl<B: ChainApi> ValidatedPool<B> {

let mut listener = self.listener.write();
for tx in &invalid {
listener.invalid(&tx.hash, true);
listener.invalid(&tx.hash);
}

invalid
Expand Down Expand Up @@ -645,15 +647,9 @@ fn fire_events<H, B, Ex>(
match *imported {
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
for f in failed {
listener.invalid(f, true);
}
for r in removed {
listener.dropped(&r.hash, Some(hash));
}
for p in promoted {
listener.ready(p, None);
}
failed.into_iter().for_each(|f| listener.invalid(f));
removed.into_iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
promoted.into_iter().for_each(|p| listener.ready(p, None));
},
base::Imported::Future { ref hash } => {
listener.future(hash)
Expand Down
20 changes: 17 additions & 3 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
Expand Down Expand Up @@ -150,6 +150,13 @@ where
(<traits::HashFor::<Block> as traits::Hash>::hash(x), x.len())
})
}

fn block_header(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
self.client.header(*at).map_err(Into::into)
}
}

/// Helper function to validate a transaction using a full chain API.
Expand All @@ -162,7 +169,7 @@ fn validate_transaction_blocking<Client, Block>(
) -> error::Result<TransactionValidity>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
Expand Down Expand Up @@ -193,7 +200,7 @@ where
impl<Client, Block> FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
Expand Down Expand Up @@ -333,4 +340,11 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for
Ok(Some(transactions))
}.boxed()
}

fn block_header(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
self.client.header(*at).map_err(Into::into)
}
}
25 changes: 20 additions & 5 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use parking_lot::Mutex;

use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero},
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT},
};
use sp_core::traits::SpawnNamed;
use sp_transaction_pool::{
Expand Down Expand Up @@ -379,6 +379,7 @@ where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>
+ sc_client_api::ExecutorProvider<Block>
+ sc_client_api::UsageProvider<Block>
Expand Down Expand Up @@ -419,6 +420,7 @@ where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
Expand Down Expand Up @@ -555,19 +557,32 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
api: &Api,
pool: &sc_transaction_graph::Pool<Api>,
) -> Vec<ExtrinsicHash<Api>> {
let hashes = api.block_body(&block_id).await
let extrinsics = api.block_body(&block_id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.unwrap_or_default();

let hashes = extrinsics.iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();

log::trace!(target: "txpool", "Pruning transactions: {:?}", hashes);

if let Err(e) = pool.prune_known(&block_id, &hashes) {
let header = match api.block_header(&block_id) {
Ok(Some(h)) => h,
Ok(None) => {
log::debug!(target: "txpool", "Could not find header for {:?}.", block_id);
return hashes
},
Err(e) => {
log::debug!(target: "txpool", "Error retrieving header for {:?}: {:?}", block_id, e);
return hashes
}
};

if let Err(e) = pool.prune(&block_id, &BlockId::hash(*header.parent_hash()), &extrinsics).await {
log::error!("Cannot prune known in the pool {:?}!", e);
}

Expand Down
113 changes: 78 additions & 35 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,6 @@ fn should_not_retain_invalid_hashes_from_retracted() {
assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_revalidate_transaction_multiple_times() {
let xt = uxt(Alice, 209);

let (pool, _guard, mut notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

let header = pool.api.push_block(1, vec![xt.clone()], true);

block_on(pool.maintain(block_event(header)));

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

let header = pool.api.push_block(2, vec![], true);
pool.api.add_invalid(&xt);

block_on(pool.maintain(block_event(header)));
block_on(notifier.next());

assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_revalidate_across_many_blocks() {
let xt1 = uxt(Alice, 209);
Expand Down Expand Up @@ -1002,21 +977,13 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() {
let xt1 = Extrinsic::IncludeData(Vec::new());

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let header = pool.api.push_block(1, vec![xt1.clone()], true);

// This will prune `xt1`.
block_on(pool.maintain(block_event(header)));

// Submit the tx again.
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("2. Imported");

let mut iterator = block_on(pool.ready_at(1));

assert_eq!(iterator.next().unwrap().data, xt1.clone());

// If the tx was not removed from the best txs, the tx would be
// returned a second time by the iterator.
assert!(iterator.next().is_none());
assert_eq!(pool.status().ready, 0);
}

#[test]
Expand All @@ -1038,3 +1005,79 @@ fn only_revalidate_on_best_block() {

assert_eq!(pool.status().ready, 1);
}

#[test]
fn stale_transactions_are_pruned() {
sp_tracing::try_init_simple();

// Our initial transactions
let xts = vec![
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 1,
amount: 1,
},
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 2,
amount: 1,
},
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 3,
amount: 1,
},
];

let (pool, _guard, _notifier) = maintained_pool();

xts.into_iter().for_each(|xt| {
block_on(
pool.submit_one(&BlockId::number(0), SOURCE, xt.into_signed_tx()),
).expect("1. Imported");
});
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 3);

// Almost the same as our initial transactions, but with some different `amount`s to make them
// generate a different hash
let xts = vec![
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 1,
amount: 2,
}.into_signed_tx(),
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 2,
amount: 2,
}.into_signed_tx(),
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 3,
amount: 2,
}.into_signed_tx(),
];

// Import block
let header = pool.api.push_block(1, xts, true);
block_on(pool.maintain(block_event(header)));
// The imported transactions have a different hash and should not evict our initial
// transactions.
assert_eq!(pool.status().future, 3);

// Import enough blocks to make our transactions stale
for n in 1..66 {
let header = pool.api.push_block(n, vec![], true);
block_on(pool.maintain(block_event(header)));
}

assert_eq!(pool.status().future, 0);
assert_eq!(pool.status().ready, 0);
}
Loading

0 comments on commit ea5d357

Please sign in to comment.