diff --git a/prdoc/pr_7545.prdoc b/prdoc/pr_7545.prdoc new file mode 100644 index 0000000000000..6956b09947fca --- /dev/null +++ b/prdoc/pr_7545.prdoc @@ -0,0 +1,9 @@ +title: '`fatxpool`: event streams moved to view domain' +doc: +- audience: Node Dev + description: |- + This pull request refactors the transaction pool `graph` module by renaming components for better clarity and decouples `graph` module from `view` module related specifics. + This PR does not introduce changes in the logic. +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/benches/basics.rs b/substrate/client/transaction-pool/benches/basics.rs index 5ba9dd40c1568..74dd69a8aaf8d 100644 --- a/substrate/client/transaction-pool/benches/basics.rs +++ b/substrate/client/transaction-pool/benches/basics.rs @@ -151,7 +151,7 @@ fn uxt(transfer: TransferData) -> Extrinsic { ExtrinsicBuilder::new_bench_call(transfer).build() } -fn bench_configured(pool: Pool, number: u64, api: Arc) { +fn bench_configured(pool: Pool, number: u64, api: Arc) { let source = TimedTransactionSource::new_external(false); let mut futures = Vec::new(); let mut tags = Vec::new(); diff --git a/substrate/client/transaction-pool/src/common/tests.rs b/substrate/client/transaction-pool/src/common/tests.rs index 7f2cbe24d8ef6..c391beb21b07f 100644 --- a/substrate/client/transaction-pool/src/common/tests.rs +++ b/substrate/client/transaction-pool/src/common/tests.rs @@ -18,7 +18,7 @@ //! Testing related primitives for internal usage in this crate. -use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, Pool, RawExtrinsicFor}; +use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, RawExtrinsicFor}; use codec::Encode; use parking_lot::Mutex; use sc_transaction_pool_api::error; @@ -36,6 +36,8 @@ use substrate_test_runtime::{ ExtrinsicBuilder, Hashing, RuntimeCall, Transfer, TransferData, H256, }; +type Pool = crate::graph::Pool; + pub(crate) const INVALID_NONCE: u64 = 254; /// Test api that implements [`ChainApi`]. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index e04c826a1d522..91237910adc1f 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -81,7 +81,8 @@ pub enum DroppedReason { } /// Dropped-logic related event from the single view. -pub type ViewStreamEvent = crate::graph::TransactionStatusEvent, BlockHash>; +pub type ViewStreamEvent = + crate::fork_aware_txpool::view::TransactionStatusEvent, BlockHash>; /// Dropped-logic stream of events coming from the single view. type ViewStream = Pin> + Send>>; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 5b43d900848ab..c21e0b8df6ff5 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -478,7 +478,7 @@ where }; if let Ok((Some(best_tree_route), Some(best_view))) = best_result { - let tmp_view: View = + let (tmp_view, _, _): (View, _, _) = View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number }); let mut all_extrinsics = vec![]; @@ -1085,26 +1085,28 @@ where ?tree_route, "build_new_view" ); - let mut view = if let Some(origin_view) = origin_view { - let mut view = View::new_from_other(&origin_view, at); - if !tree_route.retracted().is_empty() { - view.pool.clear_recently_pruned(); - } - view - } else { - debug!( - target: LOG_TARGET, - ?at, - "creating non-cloned view" - ); - View::new( - self.api.clone(), - at.clone(), - self.options.clone(), - self.metrics.clone(), - self.is_validator.clone(), - ) - }; + let (mut view, view_dropped_stream, view_aggregated_stream) = + if let Some(origin_view) = origin_view { + let (mut view, view_dropped_stream, view_aggragated_stream) = + View::new_from_other(&origin_view, at); + if !tree_route.retracted().is_empty() { + view.pool.clear_recently_pruned(); + } + (view, view_dropped_stream, view_aggragated_stream) + } else { + debug!( + target: LOG_TARGET, + ?at, + "creating non-cloned view" + ); + View::new( + self.api.clone(), + at.clone(), + self.options.clone(), + self.metrics.clone(), + self.is_validator.clone(), + ) + }; let start = Instant::now(); // 1. Capture all import notification from the very beginning, so first register all @@ -1114,15 +1116,13 @@ where view.pool.validated_pool().import_notification_stream().boxed(), ); - self.view_store.dropped_stream_controller.add_view( - view.at.hash, - view.pool.validated_pool().create_dropped_by_limits_stream().boxed(), - ); + self.view_store + .dropped_stream_controller + .add_view(view.at.hash, view_dropped_stream.boxed()); - self.view_store.listener.add_view_aggregated_stream( - view.at.hash, - view.pool.validated_pool().create_aggregated_stream().boxed(), - ); + self.view_store + .listener + .add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed()); // sync the transactions statuses and referencing views in all the listeners with newly // cloned view. view.pool.validated_pool().retrigger_notifications(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs index 2c4da0182a252..fce2d4ad6b27e 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs @@ -323,7 +323,7 @@ //! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener //! [`Pool`]: crate::graph::Pool //! [`Watcher`]: crate::graph::watcher::Watcher -//! [`AggregatedStream`]: crate::graph::AggregatedStream +//! [`AggregatedStream`]: crate::fork_aware_txpool::view::AggregatedStream //! [`Options`]: crate::graph::Options //! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream //! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 62c4320e5d353..5216f494ffa55 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -22,8 +22,8 @@ use crate::{ common::tracing_log_xt::log_xt_trace, - fork_aware_txpool::stream_map_util::next_event, - graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent}, + fork_aware_txpool::{stream_map_util::next_event, view::TransactionStatusEvent}, + graph::{self, BlockHash, ExtrinsicHash}, LOG_TARGET, }; use futures::{Future, FutureExt, Stream, StreamExt}; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs index 2f3d31d0e6fde..24f71982c7816 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs @@ -211,13 +211,9 @@ mod tests { let api = Arc::new(TestApi::default()); let block0 = api.expect_hash_and_number(0); - let view = Arc::new(View::new( - api.clone(), - block0, - Default::default(), - Default::default(), - false.into(), - )); + let view = Arc::new( + View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0, + ); let queue = Arc::new(RevalidationQueue::new()); let uxt = uxt(Transfer { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index 4fa83ccc79bfa..348108e24dcfc 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -27,14 +27,16 @@ use super::metrics::MetricsLink as PrometheusMetrics; use crate::{ common::tracing_log_xt::log_xt_trace, graph::{ - self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator, - TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, + self, base_pool::TimedTransactionSource, BlockHash, ExtrinsicFor, ExtrinsicHash, + IsValidator, TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction, + ValidatedTransactionFor, }, LOG_TARGET, }; use indexmap::IndexMap; use parking_lot::Mutex; -use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus}; +use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionStatus}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::HashAndNumber; use sp_runtime::{ generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError, @@ -109,13 +111,139 @@ impl FinishRevalidationWorkerChannels { } } +/// Single event used in aggregated stream. Tuple containing hash of transactions and its status. +pub(super) type TransactionStatusEvent = (H, TransactionStatus); +/// Warning threshold for (unbounded) channel used in aggregated view's streams. +const VIEW_STREAM_WARN_THRESHOLD: usize = 100_000; + +/// Stream of events providing statuses of all the transactions within the pool. +pub(super) type AggregatedStream = TracingUnboundedReceiver>; + +/// Type alias for a stream of events intended to track dropped transactions. +type DroppedMonitoringStream = TracingUnboundedReceiver>; + +/// Notification handler for transactions updates triggered in `ValidatedPool`. +/// +/// `ViewPoolObserver` handles transaction status changes notifications coming from an instance of +/// validated pool associated with the `View` and forwards them through specified channels +/// into the View's streams. +pub(super) struct ViewPoolObserver { + /// The sink used to notify dropped by enforcing limits or by being usurped, or invalid + /// transactions. + /// + /// Note: Ready and future statuses are alse communicated through this channel, enabling the + /// stream consumer to track views that reference the transaction. + dropped_stream_sink: TracingUnboundedSender< + TransactionStatusEvent, BlockHash>, + >, + + /// The sink of the single, merged stream providing updates for all the transactions in the + /// associated pool. + /// + /// Note: some of the events which are currently ignored on the other side of this channel + /// (external watcher) are not relayed. + aggregated_stream_sink: TracingUnboundedSender< + TransactionStatusEvent, BlockHash>, + >, +} + +impl graph::EventHandler for ViewPoolObserver { + // note: skipped, notified by ForkAwareTxPool directly to multi view listener. + fn broadcasted(&self, _: ExtrinsicHash, _: Vec) {} + fn dropped(&self, _: ExtrinsicHash) {} + fn finalized(&self, _: ExtrinsicHash, _: BlockHash, _: usize) {} + fn retracted(&self, _: ExtrinsicHash, _: BlockHash) { + // note: [#5479], we do not send to aggregated stream. + } + + fn ready(&self, tx: ExtrinsicHash) { + let status = TransactionStatus::Ready; + self.send_to_dropped_stream_sink(tx, status.clone()); + self.send_to_aggregated_stream_sink(tx, status); + } + + fn future(&self, tx: ExtrinsicHash) { + let status = TransactionStatus::Future; + self.send_to_dropped_stream_sink(tx, status.clone()); + self.send_to_aggregated_stream_sink(tx, status); + } + + fn limits_enforced(&self, tx: ExtrinsicHash) { + self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped); + } + + fn usurped(&self, tx: ExtrinsicHash, by: ExtrinsicHash) { + self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by)); + } + + fn invalid(&self, tx: ExtrinsicHash) { + self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid); + } + + fn pruned(&self, tx: ExtrinsicHash, block_hash: BlockHash, tx_index: usize) { + self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index))); + } + + fn finality_timeout(&self, tx: ExtrinsicHash, hash: BlockHash) { + //todo: do we need this? [related issue: #5482] + self.send_to_aggregated_stream_sink(tx, TransactionStatus::FinalityTimeout(hash)); + } +} + +impl ViewPoolObserver { + /// Creates an instance of `ViewPoolObserver` together with associated view's streams. + /// + /// This methods creates an event handler that shall be registered in the `ValidatedPool` + /// instance associated with the view. It also creates new view's streams: + /// - a single stream intended to watch dropped transactions only. The stream can be used to + /// subscribe to events related to dropping of all extrinsics in the pool. + /// - a single merged stream for all extrinsics in the associated pool. The stream can be used + /// to subscribe to life-cycle events of all extrinsics in the pool. For fork-aware + /// pool implementation this approach seems to be more efficient than using individual + /// streams for every transaction. + fn new() -> ( + Self, + DroppedMonitoringStream, BlockHash>, + AggregatedStream, BlockHash>, + ) { + let (dropped_stream_sink, dropped_stream) = + tracing_unbounded("mpsc_txpool_watcher", VIEW_STREAM_WARN_THRESHOLD); + let (aggregated_stream_sink, aggregated_stream) = + tracing_unbounded("mpsc_txpool_aggregated_stream", VIEW_STREAM_WARN_THRESHOLD); + + (Self { dropped_stream_sink, aggregated_stream_sink }, dropped_stream, aggregated_stream) + } + + /// Sends given event to the `dropped_stream_sink`. + fn send_to_dropped_stream_sink( + &self, + tx: ExtrinsicHash, + status: TransactionStatus, BlockHash>, + ) { + if let Err(e) = self.dropped_stream_sink.unbounded_send((tx, status.clone())) { + trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e); + } + } + + /// Sends given event to the `aggregated_stream_sink`. + fn send_to_aggregated_stream_sink( + &self, + tx: ExtrinsicHash, + status: TransactionStatus, BlockHash>, + ) { + if let Err(e) = self.aggregated_stream_sink.unbounded_send((tx, status.clone())) { + trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e); + } + } +} + /// Represents the state of transaction pool for given block. /// /// Refer to [*View*](../index.html#view) section for more details on the purpose and life cycle of /// the `View`. pub(super) struct View { /// The internal pool keeping the set of ready and future transaction at the given block. - pub(super) pool: graph::Pool, + pub(super) pool: graph::Pool>, /// The hash and number of the block with which this view is associated. pub(super) at: HashAndNumber, /// Endpoints of communication channel with background worker. @@ -136,24 +264,50 @@ where options: graph::Options, metrics: PrometheusMetrics, is_validator: IsValidator, - ) -> Self { + ) -> ( + Self, + DroppedMonitoringStream, BlockHash>, + AggregatedStream, BlockHash>, + ) { metrics.report(|metrics| metrics.non_cloned_views.inc()); - Self { - pool: graph::Pool::new(options, is_validator, api), - at, - revalidation_worker_channels: Mutex::from(None), - metrics, - } + let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new(); + ( + Self { + pool: graph::Pool::new_with_event_handler( + options, + is_validator, + api, + event_handler, + ), + at, + revalidation_worker_channels: Mutex::from(None), + metrics, + }, + dropped_stream, + aggregated_stream, + ) } /// Creates a copy of the other view. - pub(super) fn new_from_other(&self, at: &HashAndNumber) -> Self { - View { - at: at.clone(), - pool: self.pool.deep_clone(), - revalidation_worker_channels: Mutex::from(None), - metrics: self.metrics.clone(), - } + pub(super) fn new_from_other( + &self, + at: &HashAndNumber, + ) -> ( + Self, + DroppedMonitoringStream, BlockHash>, + AggregatedStream, BlockHash>, + ) { + let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new(); + ( + View { + at: at.clone(), + pool: self.pool.deep_clone_with_event_handler(event_handler), + revalidation_worker_channels: Mutex::from(None), + metrics: self.metrics.clone(), + }, + dropped_stream, + aggregated_stream, + ) } /// Imports single unvalidated extrinsic into the view. @@ -504,7 +658,10 @@ where listener_action: F, ) -> Vec> where - F: Fn(&mut crate::graph::Listener, ExtrinsicHash), + F: Fn( + &mut crate::graph::EventDispatcher>, + ExtrinsicHash, + ), { self.pool.validated_pool().remove_subtree(hashes, listener_action) } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 96cf9f7106894..a1585741839fa 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -20,7 +20,7 @@ use super::{ multi_view_listener::{MultiViewListener, TxStatusStream}, - view::View, + view::{View, ViewPoolObserver}, }; use crate::{ fork_aware_txpool::dropped_watcher::MultiViewDroppedWatcherController, @@ -62,8 +62,13 @@ where /// Helper type representing the callback allowing to trigger per-transaction events on /// `ValidatedPool`'s listener. -type RemovalListener = - Arc, ExtrinsicHash) + Send + Sync>; +type RemovalCallback = Arc< + dyn Fn( + &mut crate::graph::EventDispatcher>, + ExtrinsicHash, + ) + Send + + Sync, +>; /// Helper struct to maintain the context for pending transaction removal, executed for /// newly inserted views. @@ -74,7 +79,7 @@ where /// Hash of the transaction that will be removed, xt_hash: ExtrinsicHash, /// Action that shall be executed on underlying `ValidatedPool`'s listener. - listener_action: RemovalListener, + listener_action: RemovalCallback, } /// This enum represents an action that should be executed on the newly built @@ -119,7 +124,7 @@ where /// Creates new unprocessed instance of pending transaction removal. fn new_removal_action( xt_hash: ExtrinsicHash, - listener: RemovalListener, + listener: RemovalCallback, ) -> Self { Self { processed: false, @@ -876,8 +881,10 @@ where listener_action: F, ) -> Vec> where - F: Fn(&mut crate::graph::Listener, ExtrinsicHash) - + Clone + F: Fn( + &mut crate::graph::EventDispatcher>, + ExtrinsicHash, + ) + Clone + Send + Sync + 'static, diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs index 340b6d429ae7e..a40a708edaf05 100644 --- a/substrate/client/transaction-pool/src/graph/listener.rs +++ b/substrate/client/transaction-pool/src/graph/listener.rs @@ -20,59 +20,91 @@ use std::{collections::HashMap, fmt::Debug, hash}; use linked_hash_map::LinkedHashMap; use log::trace; -use sc_transaction_pool_api::TransactionStatus; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use serde::Serialize; -use sp_runtime::traits; use super::{watcher, BlockHash, ChainApi, ExtrinsicHash}; static LOG_TARGET: &str = "txpool::watcher"; -/// Single event used in aggregated stream. Tuple containing hash of transactions and its status. -pub type TransactionStatusEvent = (H, TransactionStatus); -/// Stream of events providing statuses of all the transactions within the pool. -pub type AggregatedStream = TracingUnboundedReceiver>; +/// The `EventHandler` trait provides a mechanism for clients to respond to various +/// transaction-related events. It offers a set of callback methods that are invoked by the +/// transaction pool's event dispatcher to notify about changes in the status of transactions. +/// +/// This trait can be implemented by any component that needs to respond to transaction lifecycle +/// events, enabling custom logic and handling of these events. +pub trait EventHandler { + /// Called when a transaction is broadcasted. + fn broadcasted(&self, _hash: ExtrinsicHash, _peers: Vec) {} -/// Warning threshold for (unbounded) channel used in aggregated stream. -const AGGREGATED_STREAM_WARN_THRESHOLD: usize = 100_000; + /// Called when a transaction is ready for execution. + fn ready(&self, _tx: ExtrinsicHash) {} -/// Extrinsic pool default listener. -pub struct Listener { + /// Called when a transaction is deemed to be executable in the future. + fn future(&self, _tx: ExtrinsicHash) {} + + /// Called when transaction pool limits result in a transaction being affected. + fn limits_enforced(&self, _tx: ExtrinsicHash) {} + + /// Called when a transaction is replaced by another. + fn usurped(&self, _tx: ExtrinsicHash, _by: ExtrinsicHash) {} + + /// Called when a transaction is dropped from the pool. + fn dropped(&self, _tx: ExtrinsicHash) {} + + /// Called when a transaction is found to be invalid. + fn invalid(&self, _tx: ExtrinsicHash) {} + + /// Called when a transaction was pruned from the pool due to its presence in imported block. + fn pruned(&self, _tx: ExtrinsicHash, _block_hash: BlockHash, _tx_index: usize) {} + + /// Called when a transaction is retracted from inclusion in a block. + fn retracted(&self, _tx: ExtrinsicHash, _block_hash: BlockHash) {} + + /// Called when a transaction has not been finalized within a timeout period. + fn finality_timeout(&self, _tx: ExtrinsicHash, _hash: BlockHash) {} + + /// Called when a transaction is finalized in a block. + fn finalized(&self, _tx: ExtrinsicHash, _block_hash: BlockHash, _tx_index: usize) {} +} + +impl EventHandler for () {} + +/// The `EventDispatcher` struct is responsible for dispatching transaction-related events from the +/// validated pool to interested observers and an optional event handler. It acts as the primary +/// liaison between the transaction pool and clients that are monitoring transaction statuses. +pub struct EventDispatcher> { /// Map containing per-transaction sinks for emitting transaction status events. watchers: HashMap>>, finality_watchers: LinkedHashMap, Vec>, - /// The sink used to notify dropped by enforcing limits or by being usurped, or invalid - /// transactions. - /// - /// Note: Ready and future statuses are alse communicated through this channel, enabling the - /// stream consumer to track views that reference the transaction. - dropped_stream_sink: Option>>>, - - /// The sink of the single, merged stream providing updates for all the transactions in the - /// associated pool. - aggregated_stream_sink: Option>>>, + /// Optional event handler (listener) that will be notified about all transactions status + /// changes from the pool. + event_handler: Option, } /// Maximum number of blocks awaiting finality at any time. const MAX_FINALITY_WATCHERS: usize = 512; -impl Default for Listener { +impl> Default + for EventDispatcher +{ fn default() -> Self { Self { watchers: Default::default(), finality_watchers: Default::default(), - dropped_stream_sink: None, - aggregated_stream_sink: None, + event_handler: None, } } } -impl Listener { - fn fire(&mut self, hash: &H, fun: F) +impl> EventDispatcher, C, L> { + /// Creates a new instance with provided event handler. + pub fn new_with_event_handler(event_handler: Option) -> Self { + Self { event_handler, ..Default::default() } + } + + fn fire(&mut self, hash: &ExtrinsicHash, fun: F) where - F: FnOnce(&mut watcher::Sender>), + F: FnOnce(&mut watcher::Sender, ExtrinsicHash>), { let clean = if let Some(h) = self.watchers.get_mut(hash) { fun(h); @@ -89,138 +121,88 @@ impl Listener watcher::Watcher> { - let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default); + pub fn create_watcher( + &mut self, + hash: ExtrinsicHash, + ) -> watcher::Watcher, ExtrinsicHash> { + let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default); sender.new_watcher(hash) } - /// Creates a new single stream intended to watch dropped transactions only. - /// - /// The stream can be used to subscribe to events related to dropping of all extrinsics in the - /// pool. - pub fn create_dropped_by_limits_stream(&mut self) -> AggregatedStream> { - let (sender, single_stream) = - tracing_unbounded("mpsc_txpool_watcher", AGGREGATED_STREAM_WARN_THRESHOLD); - self.dropped_stream_sink = Some(sender); - single_stream - } - - /// Creates a new single merged stream for all extrinsics in the associated pool. - /// - /// The stream can be used to subscribe to life-cycle events of all extrinsics in the pool. For - /// some implementations (e.g. fork-aware pool) this approach may be more efficient than using - /// individual streams for every transaction. - /// - /// Note: some of the events which are currently ignored on the other side of this channel - /// (external watcher) are not sent. - pub fn create_aggregated_stream(&mut self) -> AggregatedStream> { - let (sender, aggregated_stream) = - tracing_unbounded("mpsc_txpool_aggregated_stream", AGGREGATED_STREAM_WARN_THRESHOLD); - self.aggregated_stream_sink = Some(sender); - aggregated_stream - } - /// Notify the listeners about the extrinsic broadcast. - pub fn broadcasted(&mut self, hash: &H, peers: Vec) { + pub fn broadcasted(&mut self, hash: &ExtrinsicHash, peers: Vec) { trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash); - self.fire(hash, |watcher| watcher.broadcast(peers)); - } - - /// Sends given event to the `dropped_stream_sink`. - fn send_to_dropped_stream_sink(&mut self, tx: &H, status: TransactionStatus>) { - if let Some(ref sink) = self.dropped_stream_sink { - if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) { - trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e); - } - } - } - - /// Sends given event to the `aggregated_stream_sink`. - fn send_to_aggregated_stream_sink( - &mut self, - tx: &H, - status: TransactionStatus>, - ) { - if let Some(ref sink) = self.aggregated_stream_sink { - if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) { - trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e); - } - } + self.fire(hash, |watcher| watcher.broadcast(peers.clone())); + self.event_handler.as_ref().map(|l| l.broadcasted(*hash, peers)); } /// New transaction was added to the ready pool or promoted from the future pool. - pub fn ready(&mut self, tx: &H, old: Option<&H>) { - trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old); + pub fn ready(&mut self, tx: &ExtrinsicHash, old: Option<&ExtrinsicHash>) { + trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", *tx, old); self.fire(tx, |watcher| watcher.ready()); if let Some(old) = old { - self.fire(old, |watcher| watcher.usurped(tx.clone())); + self.fire(old, |watcher| watcher.usurped(*tx)); } - self.send_to_dropped_stream_sink(tx, TransactionStatus::Ready); - self.send_to_aggregated_stream_sink(tx, TransactionStatus::Ready); + self.event_handler.as_ref().map(|l| l.ready(*tx)); } /// New transaction was added to the future pool. - pub fn future(&mut self, tx: &H) { + pub fn future(&mut self, tx: &ExtrinsicHash) { trace!(target: LOG_TARGET, "[{:?}] Future", tx); self.fire(tx, |watcher| watcher.future()); - self.send_to_dropped_stream_sink(tx, TransactionStatus::Future); - self.send_to_aggregated_stream_sink(tx, TransactionStatus::Future); + self.event_handler.as_ref().map(|l| l.future(*tx)); } /// Transaction was dropped from the pool because of enforcing the limit. - pub fn limits_enforced(&mut self, tx: &H) { + pub fn limits_enforced(&mut self, tx: &ExtrinsicHash) { trace!(target: LOG_TARGET, "[{:?}] Dropped (limits enforced)", tx); self.fire(tx, |watcher| watcher.limit_enforced()); - self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped); + self.event_handler.as_ref().map(|l| l.limits_enforced(*tx)); } /// Transaction was replaced with other extrinsic. - pub fn usurped(&mut self, tx: &H, by: &H) { + pub fn usurped(&mut self, tx: &ExtrinsicHash, by: &ExtrinsicHash) { trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by); - self.fire(tx, |watcher| watcher.usurped(by.clone())); + self.fire(tx, |watcher| watcher.usurped(*by)); - self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by.clone())); + self.event_handler.as_ref().map(|l| l.usurped(*tx, *by)); } /// Transaction was dropped from the pool because of the failure during the resubmission of /// revalidate transactions or failure during pruning tags. - pub fn dropped(&mut self, tx: &H) { + pub fn dropped(&mut self, tx: &ExtrinsicHash) { trace!(target: LOG_TARGET, "[{:?}] Dropped", tx); self.fire(tx, |watcher| watcher.dropped()); + self.event_handler.as_ref().map(|l| l.dropped(*tx)); } /// Transaction was removed as invalid. - pub fn invalid(&mut self, tx: &H) { + pub fn invalid(&mut self, tx: &ExtrinsicHash) { trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx); self.fire(tx, |watcher| watcher.invalid()); - - self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid); + self.event_handler.as_ref().map(|l| l.invalid(*tx)); } /// Transaction was pruned from the pool. - pub fn pruned(&mut self, block_hash: BlockHash, tx: &H) { + pub fn pruned(&mut self, block_hash: BlockHash, tx: &ExtrinsicHash) { trace!(target: LOG_TARGET, "[{:?}] Pruned at {:?}", tx, block_hash); // Get the transactions included in the given block hash. let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]); - txs.push(tx.clone()); + txs.push(*tx); // Current transaction is the last one included. let tx_index = txs.len() - 1; self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index)); - self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index))); + self.event_handler.as_ref().map(|l| l.pruned(*tx, block_hash, tx_index)); while self.finality_watchers.len() > MAX_FINALITY_WATCHERS { if let Some((hash, txs)) = self.finality_watchers.pop_front() { for tx in txs { self.fire(&tx, |watcher| watcher.finality_timeout(hash)); - //todo: do we need this? [related issue: #5482] - self.send_to_aggregated_stream_sink( - &tx, - TransactionStatus::FinalityTimeout(hash), - ); + self.event_handler.as_ref().map(|l| l.finality_timeout(tx, block_hash)); } } } @@ -231,7 +213,7 @@ impl Listener Listener impl Iterator { + pub fn watched_transactions(&self) -> impl Iterator> { self.watchers.keys() } } diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs index c3161799785a9..3e6a63babc99c 100644 --- a/substrate/client/transaction-pool/src/graph/mod.rs +++ b/substrate/client/transaction-pool/src/graph/mod.rs @@ -42,13 +42,12 @@ pub use self::pool::{ TransactionFor, ValidatedTransactionFor, }; pub use validated_pool::{ - BaseSubmitOutcome, IsValidator, Listener, ValidatedPoolSubmitOutcome, ValidatedTransaction, + BaseSubmitOutcome, EventDispatcher, IsValidator, ValidatedPoolSubmitOutcome, + ValidatedTransaction, }; pub(crate) use self::pool::CheckBannedBeforeVerify; -pub(crate) use listener::TransactionStatusEvent; +pub(crate) use listener::EventHandler; -#[cfg(doc)] -pub(crate) use listener::AggregatedStream; #[cfg(doc)] pub(crate) use validated_pool::ValidatedPool; diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index d938e9bf06e7d..afd46617b366c 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -36,7 +36,7 @@ use std::{ use super::{ base_pool as base, validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction}, - ValidatedPoolSubmitOutcome, + EventHandler, ValidatedPoolSubmitOutcome, }; /// Modification notification event stream type; @@ -173,11 +173,11 @@ pub(crate) enum CheckBannedBeforeVerify { } /// Extrinsics pool that performs validation. -pub struct Pool { - validated_pool: Arc>, +pub struct Pool> { + validated_pool: Arc>, } -impl Pool { +impl> Pool { /// Create a new transaction pool with statically sized rotator. pub fn new_with_staticly_sized_rotator( options: Options, @@ -198,6 +198,23 @@ impl Pool { Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) } } + /// Create a new transaction pool. + pub fn new_with_event_handler( + options: Options, + is_validator: IsValidator, + api: Arc, + event_handler: L, + ) -> Self { + Self { + validated_pool: Arc::new(ValidatedPool::new_with_event_handler( + options, + is_validator, + api, + event_handler, + )), + } + } + /// Imports a bunch of unverified extrinsics to the pool pub async fn submit_at( &self, @@ -481,7 +498,7 @@ impl Pool { } /// Get a reference to the underlying validated pool. - pub fn validated_pool(&self) -> &ValidatedPool { + pub fn validated_pool(&self) -> &ValidatedPool { &self.validated_pool } @@ -491,12 +508,13 @@ impl Pool { } } -impl Pool { +impl> Pool { /// Deep clones the pool. /// /// Must be called on purpose: it duplicates all the internal structures. - pub fn deep_clone(&self) -> Self { - let other: ValidatedPool = (*self.validated_pool).clone(); + pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self { + let other: ValidatedPool = + self.validated_pool().deep_clone_with_event_handler(event_handler); Self { validated_pool: Arc::from(other) } } } @@ -519,6 +537,8 @@ mod tests { const SOURCE: TimedTransactionSource = TimedTransactionSource { source: TransactionSource::External, timestamp: None }; + type Pool = super::Pool; + #[test] fn should_validate_and_import_transaction() { // given diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 174b69da7611b..8eb967421bd70 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -35,6 +35,7 @@ use std::time::Instant; use super::{ base_pool::{self as base, PruneStatus}, + listener::EventHandler, pool::{ BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor, }, @@ -91,8 +92,8 @@ impl ValidatedTransaction { pub type ValidatedTransactionFor = ValidatedTransaction, ExtrinsicFor, ::Error>; -/// A type alias representing ValidatedPool listener for given ChainApi type. -pub type Listener = super::listener::Listener, B>; +/// A type alias representing ValidatedPool event dispatcher for given ChainApi type. +pub type EventDispatcher = super::listener::EventDispatcher, B, L>; /// A closure that returns true if the local node is a validator that can author blocks. #[derive(Clone)] @@ -155,23 +156,23 @@ impl BaseSubmitOutcome { } /// Pool that deals with validated transactions. -pub struct ValidatedPool { +pub struct ValidatedPool> { api: Arc, is_validator: IsValidator, options: Options, - listener: RwLock>, + event_dispatcher: RwLock>, pub(crate) pool: RwLock, ExtrinsicFor>>, import_notification_sinks: Mutex>>>, rotator: PoolRotator>, } -impl Clone for ValidatedPool { +impl> Clone for ValidatedPool { fn clone(&self) -> Self { Self { api: self.api.clone(), is_validator: self.is_validator.clone(), options: self.options.clone(), - listener: Default::default(), + event_dispatcher: Default::default(), pool: RwLock::from(self.pool.read().clone()), import_notification_sinks: Default::default(), rotator: self.rotator.clone(), @@ -179,7 +180,16 @@ impl Clone for ValidatedPool { } } -impl ValidatedPool { +impl> ValidatedPool { + pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self { + Self { + event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some( + event_handler, + ))), + ..self.clone() + } + } + /// Create a new transaction pool with statically sized rotator. pub fn new_with_staticly_sized_rotator( options: Options, @@ -187,7 +197,7 @@ impl ValidatedPool { api: Arc, ) -> Self { let ban_time = options.ban_time; - Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time)) + Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None) } /// Create a new transaction pool. @@ -199,6 +209,25 @@ impl ValidatedPool { is_validator, api, PoolRotator::new_with_expected_size(ban_time, total_count), + None, + ) + } + + /// Create a new transaction pool with given event handler. + pub fn new_with_event_handler( + options: Options, + is_validator: IsValidator, + api: Arc, + event_handler: L, + ) -> Self { + let ban_time = options.ban_time; + let total_count = options.total_count(); + Self::new_with_rotator( + options, + is_validator, + api, + PoolRotator::new_with_expected_size(ban_time, total_count), + Some(event_handler), ) } @@ -207,12 +236,13 @@ impl ValidatedPool { is_validator: IsValidator, api: Arc, rotator: PoolRotator>, + event_handler: Option, ) -> Self { let base_pool = base::BasePool::new(options.reject_future_transactions); Self { is_validator, options, - listener: Default::default(), + event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)), api, pool: RwLock::new(base_pool), import_notification_sinks: Default::default(), @@ -309,8 +339,8 @@ impl ValidatedPool { }); } - let mut listener = self.listener.write(); - fire_events(&mut *listener, &imported); + let mut event_dispatcher = self.event_dispatcher.write(); + fire_events(&mut *event_dispatcher, &imported); Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority))) }, ValidatedTransaction::Invalid(hash, err) => { @@ -320,7 +350,7 @@ impl ValidatedPool { }, ValidatedTransaction::Unknown(hash, err) => { log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one unknown {:?}", hash, err); - self.listener.write().invalid(&hash); + self.event_dispatcher.write().invalid(&hash); Err(err) }, } @@ -360,9 +390,9 @@ impl ValidatedPool { } // run notifications - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for h in &removed { - listener.limits_enforced(h); + event_dispatcher.limits_enforced(h); } removed @@ -398,12 +428,12 @@ impl ValidatedPool { &self, tx_hash: ExtrinsicHash, ) -> Watcher, ExtrinsicHash> { - self.listener.write().create_watcher(tx_hash) + self.event_dispatcher.write().create_watcher(tx_hash) } /// Provides a list of hashes for all watched transactions in the pool. pub fn watched_transactions(&self) -> Vec> { - self.listener.read().watched_transactions().map(Clone::clone).collect() + self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect() } /// Resubmits revalidated transactions back to the pool. @@ -528,15 +558,15 @@ impl ValidatedPool { }; // and now let's notify listeners about status changes - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for (hash, final_status) in final_statuses { let initial_status = initial_statuses.remove(&hash); if initial_status.is_none() || Some(final_status) != initial_status { match final_status { - Status::Future => listener.future(&hash), - Status::Ready => listener.ready(&hash, None), - Status::Dropped => listener.dropped(&hash), - Status::Failed => listener.invalid(&hash), + Status::Future => event_dispatcher.future(&hash), + Status::Ready => event_dispatcher.ready(&hash, None), + Status::Dropped => event_dispatcher.dropped(&hash), + Status::Failed => event_dispatcher.invalid(&hash), } } } @@ -569,12 +599,12 @@ impl ValidatedPool { // Notify event listeners of all transactions // that were promoted to `Ready` or were dropped. { - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for promoted in &status.promoted { - fire_events(&mut *listener, promoted); + fire_events(&mut *event_dispatcher, promoted); } for f in &status.failed { - listener.dropped(f); + event_dispatcher.dropped(f); } } @@ -618,13 +648,13 @@ impl ValidatedPool { at: &HashAndNumber, hashes: impl Iterator>, ) { - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); let mut set = HashSet::with_capacity(hashes.size_hint().0); for h in hashes { // `hashes` has possibly duplicate hashes. // we'd like to send out the `InBlock` notification only once. if !set.contains(&h) { - listener.pruned(at.hash, &h); + event_dispatcher.pruned(at.hash, &h); set.insert(h); } } @@ -681,9 +711,9 @@ impl ValidatedPool { /// Invoked when extrinsics are broadcasted. pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for (hash, peers) in propagated.into_iter() { - listener.broadcasted(&hash, peers); + event_dispatcher.broadcasted(&hash, peers); } } @@ -736,27 +766,13 @@ impl ValidatedPool { "Attempting to notify watchers of finalization for {}", block_hash, ); - self.listener.write().finalized(block_hash); + self.event_dispatcher.write().finalized(block_hash); Ok(()) } - /// Notify the listener of retracted blocks + /// Notify the event_dispatcher of retracted blocks pub fn on_block_retracted(&self, block_hash: BlockHash) { - self.listener.write().retracted(block_hash) - } - - /// Refer to [`Listener::create_dropped_by_limits_stream`] for details. - pub fn create_dropped_by_limits_stream( - &self, - ) -> super::listener::AggregatedStream, BlockHash> { - self.listener.write().create_dropped_by_limits_stream() - } - - /// Refer to [`Listener::create_aggregated_stream`] - pub fn create_aggregated_stream( - &self, - ) -> super::listener::AggregatedStream, BlockHash> { - self.listener.write().create_aggregated_stream() + self.event_dispatcher.write().retracted(block_hash) } /// Resends ready and future events for all the ready and future transactions that are already @@ -765,12 +781,12 @@ impl ValidatedPool { /// Intended to be called after cloning the instance of `ValidatedPool`. pub fn retrigger_notifications(&self) { let pool = self.pool.read(); - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); pool.ready().for_each(|r| { - listener.ready(&r.hash, None); + event_dispatcher.ready(&r.hash, None); }); pool.futures().for_each(|f| { - listener.future(&f.hash); + event_dispatcher.future(&f.hash); }); } @@ -782,21 +798,21 @@ impl ValidatedPool { /// The root transaction will be banned from re-entrering the pool. Descendant transactions may /// be re-submitted to the pool if required. /// - /// A `listener_action` callback function is invoked for every transaction that is removed, - /// providing a reference to the pool's listener and the hash of the removed transaction. This - /// allows to trigger the required events. + /// A `event_disaptcher_action` callback function is invoked for every transaction that is + /// removed, providing a reference to the pool's event dispatcher and the hash of the removed + /// transaction. This allows to trigger the required events. /// /// Returns a vector containing the hashes of all removed transactions, including the root /// transaction specified by `tx_hash`. pub fn remove_subtree( &self, hashes: &[ExtrinsicHash], - listener_action: F, + event_dispatcher_action: F, ) -> Vec> where - F: Fn(&mut Listener, ExtrinsicHash), + F: Fn(&mut EventDispatcher, ExtrinsicHash), { - // temporarily ban invalid transactions + // temporarily ban removed transactions self.rotator.ban(&Instant::now(), hashes.iter().cloned()); let removed = self.pool.write().remove_subtree(hashes); @@ -804,25 +820,28 @@ impl ValidatedPool { .into_iter() .map(|tx| { let removed_tx_hash = tx.hash; - let mut listener = self.listener.write(); - listener_action(&mut *listener, removed_tx_hash); + let mut event_dispatcher = self.event_dispatcher.write(); + event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash); tx.clone() }) .collect::>() } } -fn fire_events(listener: &mut Listener, imported: &base::Imported, Ex>) -where +fn fire_events( + event_dispatcher: &mut EventDispatcher, + imported: &base::Imported, Ex>, +) where B: ChainApi, + L: EventHandler, { match *imported { base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { - listener.ready(hash, None); - failed.iter().for_each(|f| listener.invalid(f)); - removed.iter().for_each(|r| listener.usurped(&r.hash, hash)); - promoted.iter().for_each(|p| listener.ready(p, None)); + event_dispatcher.ready(hash, None); + failed.iter().for_each(|f| event_dispatcher.invalid(f)); + removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash)); + promoted.iter().for_each(|p| event_dispatcher.ready(p, None)); }, - base::Imported::Future { ref hash } => listener.future(hash), + base::Imported::Future { ref hash } => event_dispatcher.future(hash), } } diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs index ffcade0859160..413d97b11b0e1 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs @@ -18,7 +18,8 @@ //! Pool periodic revalidation. -use crate::graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction}; +use crate::graph::{BlockHash, ChainApi, ExtrinsicHash, ValidatedTransaction}; +use futures::prelude::*; use indexmap::IndexMap; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::{ @@ -28,17 +29,17 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, pin::Pin, sync::Arc, + time::Duration, }; -use futures::prelude::*; -use std::time::Duration; - const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200); const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20; const LOG_TARGET: &str = "txpool::revalidation"; +type Pool = crate::graph::Pool; + /// Payload from queue to worker. struct WorkerPayload { at: BlockHash, diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs index 9f4d63f3ba3a9..745b57d0c85bf 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs @@ -29,7 +29,7 @@ use crate::{ error, log_xt::log_xt_trace, }, - graph::{self, base_pool::TimedTransactionSource, ExtrinsicHash, IsValidator}, + graph::{self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator}, ReadyIteratorFor, LOG_TARGET, }; use async_trait::async_trait; @@ -64,7 +64,7 @@ where Block: BlockT, PoolApi: graph::ChainApi, { - pool: Arc>, + pool: Arc>, api: Arc, revalidation_strategy: Arc>>>, revalidation_queue: Arc>, @@ -225,7 +225,7 @@ where } /// Gets shared reference to the underlying pool. - pub fn pool(&self) -> &Arc> { + pub fn pool(&self) -> &Arc> { &self.pool } @@ -583,10 +583,14 @@ impl RevalidationStatus { } /// Prune the known txs for the given block. -pub async fn prune_known_txs_for_block>( +pub async fn prune_known_txs_for_block< + Block: BlockT, + Api: graph::ChainApi, + L: EventHandler, +>( at: &HashAndNumber, api: &Api, - pool: &graph::Pool, + pool: &graph::Pool, ) -> Vec> { let extrinsics = api .block_body(at.hash) diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs index c70f454833145..1403ea06df791 100644 --- a/substrate/client/transaction-pool/tests/pool.rs +++ b/substrate/client/transaction-pool/tests/pool.rs @@ -45,6 +45,8 @@ use substrate_test_runtime_client::{ }; use substrate_test_runtime_transaction_pool::{uxt, TestApi}; +type Pool = sc_transaction_pool::Pool; + const LOG_TARGET: &str = "txpool"; fn pool() -> (Pool, Arc) {