From ea5d3570673d125dfe0b7da33b345c3c13195380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 3 Jun 2021 16:04:29 +0200 Subject: [PATCH] Transaction pool: Ensure that we prune transactions properly (#8963) * Transaction pool: Ensure that we prune transactions properly There was a bug in the transaction pool that we didn't pruned transactions properly because we called `prune_known`, instead of `prune`. This bug was introduced by: https://github.com/paritytech/substrate/pull/4629 This is required to have stale extrinsics being removed properly, so that they don't fill up the tx pool. * Fix compilation * Fix benches * ... --- .../transaction-pool/graph/benches/basics.rs | 9 +- client/transaction-pool/graph/src/listener.rs | 10 +- client/transaction-pool/graph/src/pool.rs | 15 ++- .../graph/src/validated_pool.rs | 26 ++-- client/transaction-pool/src/api.rs | 20 +++- client/transaction-pool/src/lib.rs | 25 +++- client/transaction-pool/src/testing/pool.rs | 113 ++++++++++++------ .../runtime/transaction-pool/src/lib.rs | 20 +++- 8 files changed, 170 insertions(+), 68 deletions(-) diff --git a/client/transaction-pool/graph/benches/basics.rs b/client/transaction-pool/graph/benches/basics.rs index 21e3d1006d5df..0c55c931eb212 100644 --- a/client/transaction-pool/graph/benches/basics.rs +++ b/client/transaction-pool/graph/benches/basics.rs @@ -23,7 +23,7 @@ use sc_transaction_graph::*; use codec::Encode; use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId}; use sp_runtime::{ - generic::BlockId, + generic::BlockId, traits::Block as BlockT, transaction_validity::{ ValidTransaction, InvalidTransaction, TransactionValidity, TransactionTag as Tag, TransactionSource, @@ -114,6 +114,13 @@ impl ChainApi for TestApi { fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { ready(Ok(None)) } + + fn block_header( + &self, + _: &BlockId, + ) -> Result::Header>, Self::Error> { + Ok(None) + } } fn uxt(transfer: Transfer) -> Extrinsic { diff --git a/client/transaction-pool/graph/src/listener.rs b/client/transaction-pool/graph/src/listener.rs index 563243bf45945..e81c286600274 100644 --- a/client/transaction-pool/graph/src/listener.rs +++ b/client/transaction-pool/graph/src/listener.rs @@ -23,7 +23,7 @@ use std::{ use linked_hash_map::LinkedHashMap; use serde::Serialize; -use log::{debug, trace, warn}; +use log::{debug, trace}; use sp_runtime::traits; use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash}; @@ -99,12 +99,8 @@ impl Listener { } /// Transaction was removed as invalid. - pub fn invalid(&mut self, tx: &H, warn: bool) { - if warn { - warn!(target: "txpool", "[{:?}] Extrinsic invalid", tx); - } else { - debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx); - } + pub fn invalid(&mut self, tx: &H) { + debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx); self.fire(tx, |watcher| watcher.invalid()); } diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 7f9bc3c757f11..4f132550d7036 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -95,6 +95,12 @@ pub trait ChainApi: Send + Sync { /// Returns a block body given the block id. fn block_body(&self, at: &BlockId) -> Self::BodyFuture; + + /// Returns a block header given the block id. + fn block_header( + &self, + at: &BlockId, + ) -> Result::Header>, Self::Error>; } /// Pool configuration options. @@ -237,7 +243,7 @@ impl Pool { ) -> Result<(), B::Error> { // Get details of all extrinsics that are already in the pool let in_pool_tags = self.validated_pool.extrinsics_tags(hashes) - .into_iter().filter_map(|x| x).flat_map(|x| x); + .into_iter().filter_map(|x| x).flatten(); // Prune all transactions that provide given tags let prune_status = self.validated_pool.prune_tags(in_pool_tags)?; @@ -579,6 +585,13 @@ mod tests { fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { futures::future::ready(Ok(None)) } + + fn block_header( + &self, + _: &BlockId, + ) -> Result::Header>, Self::Error> { + Ok(None) + } } fn uxt(transfer: Transfer) -> Extrinsic { diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index b9c2593f019c2..ec05106896f2c 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -230,7 +230,7 @@ impl ValidatedPool { Err(err) }, ValidatedTransaction::Unknown(hash, err) => { - self.listener.write().invalid(&hash, false); + self.listener.write().invalid(&hash); Err(err) }, } @@ -415,7 +415,7 @@ impl ValidatedPool { Status::Future => listener.future(&hash), Status::Ready => listener.ready(&hash, None), Status::Dropped => listener.dropped(&hash, None), - Status::Failed => listener.invalid(&hash, initial_status.is_some()), + Status::Failed => listener.invalid(&hash), } } } @@ -423,10 +423,12 @@ impl ValidatedPool { /// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown). pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash]) -> Vec>> { - self.pool.read().by_hashes(&hashes) + self.pool.read() + .by_hashes(&hashes) .into_iter() - .map(|existing_in_pool| existing_in_pool - .map(|transaction| transaction.provides.to_vec())) + .map(|existing_in_pool| + existing_in_pool.map(|transaction| transaction.provides.to_vec()) + ) .collect() } @@ -599,7 +601,7 @@ impl ValidatedPool { let mut listener = self.listener.write(); for tx in &invalid { - listener.invalid(&tx.hash, true); + listener.invalid(&tx.hash); } invalid @@ -645,15 +647,9 @@ fn fire_events( match *imported { base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { listener.ready(hash, None); - for f in failed { - listener.invalid(f, true); - } - for r in removed { - listener.dropped(&r.hash, Some(hash)); - } - for p in promoted { - listener.ready(p, None); - } + failed.into_iter().for_each(|f| listener.invalid(f)); + removed.into_iter().for_each(|r| listener.dropped(&r.hash, Some(hash))); + promoted.into_iter().for_each(|p| listener.ready(p, None)); }, base::Imported::Future { ref hash } => { listener.future(hash) diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 2ebf038844fab..09864f78248a3 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -81,7 +81,7 @@ impl FullChainApi { impl sc_transaction_graph::ChainApi for FullChainApi where Block: BlockT, - Client: ProvideRuntimeApi + BlockBackend + BlockIdTo, + Client: ProvideRuntimeApi + BlockBackend + BlockIdTo + HeaderBackend, Client: Send + Sync + 'static, Client::Api: TaggedTransactionQueue, { @@ -150,6 +150,13 @@ where ( as traits::Hash>::hash(x), x.len()) }) } + + fn block_header( + &self, + at: &BlockId, + ) -> Result::Header>, Self::Error> { + self.client.header(*at).map_err(Into::into) + } } /// Helper function to validate a transaction using a full chain API. @@ -162,7 +169,7 @@ fn validate_transaction_blocking( ) -> error::Result where Block: BlockT, - Client: ProvideRuntimeApi + BlockBackend + BlockIdTo, + Client: ProvideRuntimeApi + BlockBackend + BlockIdTo + HeaderBackend, Client: Send + Sync + 'static, Client::Api: TaggedTransactionQueue, { @@ -193,7 +200,7 @@ where impl FullChainApi where Block: BlockT, - Client: ProvideRuntimeApi + BlockBackend + BlockIdTo, + Client: ProvideRuntimeApi + BlockBackend + BlockIdTo + HeaderBackend, Client: Send + Sync + 'static, Client::Api: TaggedTransactionQueue, { @@ -333,4 +340,11 @@ impl sc_transaction_graph::ChainApi for Ok(Some(transactions)) }.boxed() } + + fn block_header( + &self, + at: &BlockId, + ) -> Result::Header>, Self::Error> { + self.client.header(*at).map_err(Into::into) + } } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 32bea107d8acc..0cd47f870d1af 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -40,7 +40,7 @@ use parking_lot::Mutex; use sp_runtime::{ generic::BlockId, - traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero}, + traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT}, }; use sp_core::traits::SpawnNamed; use sp_transaction_pool::{ @@ -379,6 +379,7 @@ where Block: BlockT, Client: sp_api::ProvideRuntimeApi + sc_client_api::BlockBackend + + sc_client_api::blockchain::HeaderBackend + sp_runtime::traits::BlockIdTo + sc_client_api::ExecutorProvider + sc_client_api::UsageProvider @@ -419,6 +420,7 @@ where Block: BlockT, Client: sp_api::ProvideRuntimeApi + sc_client_api::BlockBackend + + sc_client_api::blockchain::HeaderBackend + sp_runtime::traits::BlockIdTo, Client: Send + Sync + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue, @@ -555,19 +557,32 @@ async fn prune_known_txs_for_block>( api: &Api, pool: &sc_transaction_graph::Pool, ) -> Vec> { - let hashes = api.block_body(&block_id).await + let extrinsics = api.block_body(&block_id).await .unwrap_or_else(|e| { log::warn!("Prune known transactions: error request {:?}!", e); None }) - .unwrap_or_default() - .into_iter() + .unwrap_or_default(); + + let hashes = extrinsics.iter() .map(|tx| pool.hash_of(&tx)) .collect::>(); log::trace!(target: "txpool", "Pruning transactions: {:?}", hashes); - if let Err(e) = pool.prune_known(&block_id, &hashes) { + let header = match api.block_header(&block_id) { + Ok(Some(h)) => h, + Ok(None) => { + log::debug!(target: "txpool", "Could not find header for {:?}.", block_id); + return hashes + }, + Err(e) => { + log::debug!(target: "txpool", "Error retrieving header for {:?}: {:?}", block_id, e); + return hashes + } + }; + + if let Err(e) = pool.prune(&block_id, &BlockId::hash(*header.parent_hash()), &extrinsics).await { log::error!("Cannot prune known in the pool {:?}!", e); } diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index 904870ae0ece9..999d1ab65eb65 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -306,31 +306,6 @@ fn should_not_retain_invalid_hashes_from_retracted() { assert_eq!(pool.status().ready, 0); } -#[test] -fn should_revalidate_transaction_multiple_times() { - let xt = uxt(Alice, 209); - - let (pool, _guard, mut notifier) = maintained_pool(); - - block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); - assert_eq!(pool.status().ready, 1); - - let header = pool.api.push_block(1, vec![xt.clone()], true); - - block_on(pool.maintain(block_event(header))); - - block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); - assert_eq!(pool.status().ready, 1); - - let header = pool.api.push_block(2, vec![], true); - pool.api.add_invalid(&xt); - - block_on(pool.maintain(block_event(header))); - block_on(notifier.next()); - - assert_eq!(pool.status().ready, 0); -} - #[test] fn should_revalidate_across_many_blocks() { let xt1 = uxt(Alice, 209); @@ -1002,21 +977,13 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() { let xt1 = Extrinsic::IncludeData(Vec::new()); block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); let header = pool.api.push_block(1, vec![xt1.clone()], true); // This will prune `xt1`. block_on(pool.maintain(block_event(header))); - // Submit the tx again. - block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("2. Imported"); - - let mut iterator = block_on(pool.ready_at(1)); - - assert_eq!(iterator.next().unwrap().data, xt1.clone()); - - // If the tx was not removed from the best txs, the tx would be - // returned a second time by the iterator. - assert!(iterator.next().is_none()); + assert_eq!(pool.status().ready, 0); } #[test] @@ -1038,3 +1005,79 @@ fn only_revalidate_on_best_block() { assert_eq!(pool.status().ready, 1); } + +#[test] +fn stale_transactions_are_pruned() { + sp_tracing::try_init_simple(); + + // Our initial transactions + let xts = vec![ + Transfer { + from: Alice.into(), + to: Bob.into(), + nonce: 1, + amount: 1, + }, + Transfer { + from: Alice.into(), + to: Bob.into(), + nonce: 2, + amount: 1, + }, + Transfer { + from: Alice.into(), + to: Bob.into(), + nonce: 3, + amount: 1, + }, + ]; + + let (pool, _guard, _notifier) = maintained_pool(); + + xts.into_iter().for_each(|xt| { + block_on( + pool.submit_one(&BlockId::number(0), SOURCE, xt.into_signed_tx()), + ).expect("1. Imported"); + }); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 3); + + // Almost the same as our initial transactions, but with some different `amount`s to make them + // generate a different hash + let xts = vec![ + Transfer { + from: Alice.into(), + to: Bob.into(), + nonce: 1, + amount: 2, + }.into_signed_tx(), + Transfer { + from: Alice.into(), + to: Bob.into(), + nonce: 2, + amount: 2, + }.into_signed_tx(), + Transfer { + from: Alice.into(), + to: Bob.into(), + nonce: 3, + amount: 2, + }.into_signed_tx(), + ]; + + // Import block + let header = pool.api.push_block(1, xts, true); + block_on(pool.maintain(block_event(header))); + // The imported transactions have a different hash and should not evict our initial + // transactions. + assert_eq!(pool.status().future, 3); + + // Import enough blocks to make our transactions stale + for n in 1..66 { + let header = pool.api.push_block(n, vec![], true); + block_on(pool.maintain(block_event(header))); + } + + assert_eq!(pool.status().future, 0); + assert_eq!(pool.status().ready, 0); +} diff --git a/test-utils/runtime/transaction-pool/src/lib.rs b/test-utils/runtime/transaction-pool/src/lib.rs index bcba2fb6e6781..91f26b1921cec 100644 --- a/test-utils/runtime/transaction-pool/src/lib.rs +++ b/test-utils/runtime/transaction-pool/src/lib.rs @@ -23,7 +23,7 @@ use codec::Encode; use parking_lot::RwLock; use sp_runtime::{ generic::{self, BlockId}, - traits::{BlakeTwo256, Hash as HashT, Block as _, Header as _}, + traits::{BlakeTwo256, Hash as HashT, Block as BlockT, Header as _}, transaction_validity::{ TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction, TransactionSource, @@ -346,6 +346,24 @@ impl sc_transaction_graph::ChainApi for TestApi { .map(|b| b.extrinsics().to_vec()), })) } + + fn block_header( + &self, + at: &BlockId, + ) -> Result::Header>, Self::Error> { + Ok(match at { + BlockId::Number(num) => self.chain + .read() + .block_by_number + .get(num) + .map(|b| b[0].0.header().clone()), + BlockId::Hash(hash) => self.chain + .read() + .block_by_hash + .get(hash) + .map(|b| b.header().clone()), + }) + } } impl sp_blockchain::HeaderMetadata for TestApi {