Skip to content

Commit

Permalink
fatxpool: event streams moved to view domain (#7545)
Browse files Browse the repository at this point in the history
#### Overview

This pull request refactors the transaction pool `graph` module by
renaming components for better clarity. The `EventHandler` trait was
introduced to enhance flexibility in handling transaction lifecycle
events. Changes include renaming `graph::Listener` to
`graph::EventDispatcher` and moving certain functionalities from `graph`
to `view` module in order to decouple `graph` from `view`-related
specifics.

This PR does not introduce changes in the logic.

#### Notes for Reviewers
All the changes looks dense at first, but in fact following was done:
- The `graph::Listener` was renamed to
[`graph::EventDispatcher`](https://github.com/paritytech/polkadot-sdk/blob/515cb4042d097581ed6b4195e57b04494e385a17/substrate/client/transaction-pool/src/graph/listener.rs#L74C12-L74C27),
to better reflect its role in dispatching transaction-related events
from `ValidatedPool`. The `EventDispatcher` now utilizes the `L:
EventHandler` generic type to handle transaction status events.
- The new
[`EventHandler`](https://github.com/paritytech/polkadot-sdk/blob/515cb4042d097581ed6b4195e57b04494e385a17/substrate/client/transaction-pool/src/graph/listener.rs#L34)
trait was introduced to handle transaction lifecycle events, improving
implementation flexibility and providing clearer role descriptions
within the system. Introduction of this trait allowed the removal of
`View` related entities (e.g. streams) from the `ValidatedPool`'s event
dispatcher (previously _listener_).
- The _dropped monitoring_ and _aggregated events_ stream
[functionalities](https://github.com/paritytech/polkadot-sdk/blob/515cb4042d097581ed6b4195e57b04494e385a17/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs#L157-L188)
and [related
types](https://github.com/paritytech/polkadot-sdk/blob/515cb4042d097581ed6b4195e57b04494e385a17/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs#L112-L121)
were moved from `graph::listener` to the `view` module. The
[`ViewPoolObserver`](https://github.com/paritytech/polkadot-sdk/blob/515cb4042d097581ed6b4195e57b04494e385a17/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs#L128C19-L128C35),
which implements `EventHandler`, now provides the implementation of
streams feeding.
- Fields, arguments, and variables previously named `listener` were
renamed to `event_dispatcher` to align with their purpose and type
naming.
- Various structs such as `Pool` and `ValidatedPool` were updated to
include a generic `L: EventHandler` across the codebase.

---------

Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Iulian Barbu <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2025
1 parent ba7cb48 commit 8507e70
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 260 deletions.
9 changes: 9 additions & 0 deletions prdoc/pr_7545.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
title: '`fatxpool`: event streams moved to view domain'
doc:
- audience: Node Dev
description: |-
This pull request refactors the transaction pool `graph` module by renaming components for better clarity and decouples `graph` module from `view` module related specifics.
This PR does not introduce changes in the logic.
crates:
- name: sc-transaction-pool
bump: minor
2 changes: 1 addition & 1 deletion substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ fn uxt(transfer: TransferData) -> Extrinsic {
ExtrinsicBuilder::new_bench_call(transfer).build()
}

fn bench_configured(pool: Pool<TestApi>, number: u64, api: Arc<TestApi>) {
fn bench_configured(pool: Pool<TestApi, ()>, number: u64, api: Arc<TestApi>) {
let source = TimedTransactionSource::new_external(false);
let mut futures = Vec::new();
let mut tags = Vec::new();
Expand Down
4 changes: 3 additions & 1 deletion substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Testing related primitives for internal usage in this crate.
use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, Pool, RawExtrinsicFor};
use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, RawExtrinsicFor};
use codec::Encode;
use parking_lot::Mutex;
use sc_transaction_pool_api::error;
Expand All @@ -36,6 +36,8 @@ use substrate_test_runtime::{
ExtrinsicBuilder, Hashing, RuntimeCall, Transfer, TransferData, H256,
};

type Pool<Api> = crate::graph::Pool<Api, ()>;

pub(crate) const INVALID_NONCE: u64 = 254;

/// Test api that implements [`ChainApi`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ pub enum DroppedReason<Hash> {
}

/// Dropped-logic related event from the single view.
pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
pub type ViewStreamEvent<C> =
crate::fork_aware_txpool::view::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;

/// Dropped-logic stream of events coming from the single view.
type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ where
};

if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
let tmp_view: View<ChainApi> =
let (tmp_view, _, _): (View<ChainApi>, _, _) =
View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number });

let mut all_extrinsics = vec![];
Expand Down Expand Up @@ -1085,26 +1085,28 @@ where
?tree_route,
"build_new_view"
);
let mut view = if let Some(origin_view) = origin_view {
let mut view = View::new_from_other(&origin_view, at);
if !tree_route.retracted().is_empty() {
view.pool.clear_recently_pruned();
}
view
} else {
debug!(
target: LOG_TARGET,
?at,
"creating non-cloned view"
);
View::new(
self.api.clone(),
at.clone(),
self.options.clone(),
self.metrics.clone(),
self.is_validator.clone(),
)
};
let (mut view, view_dropped_stream, view_aggregated_stream) =
if let Some(origin_view) = origin_view {
let (mut view, view_dropped_stream, view_aggragated_stream) =
View::new_from_other(&origin_view, at);
if !tree_route.retracted().is_empty() {
view.pool.clear_recently_pruned();
}
(view, view_dropped_stream, view_aggragated_stream)
} else {
debug!(
target: LOG_TARGET,
?at,
"creating non-cloned view"
);
View::new(
self.api.clone(),
at.clone(),
self.options.clone(),
self.metrics.clone(),
self.is_validator.clone(),
)
};

let start = Instant::now();
// 1. Capture all import notification from the very beginning, so first register all
Expand All @@ -1114,15 +1116,13 @@ where
view.pool.validated_pool().import_notification_stream().boxed(),
);

self.view_store.dropped_stream_controller.add_view(
view.at.hash,
view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
);
self.view_store
.dropped_stream_controller
.add_view(view.at.hash, view_dropped_stream.boxed());

self.view_store.listener.add_view_aggregated_stream(
view.at.hash,
view.pool.validated_pool().create_aggregated_stream().boxed(),
);
self.view_store
.listener
.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
// sync the transactions statuses and referencing views in all the listeners with newly
// cloned view.
view.pool.validated_pool().retrigger_notifications();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
//! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener
//! [`Pool`]: crate::graph::Pool
//! [`Watcher`]: crate::graph::watcher::Watcher
//! [`AggregatedStream`]: crate::graph::AggregatedStream
//! [`AggregatedStream`]: crate::fork_aware_txpool::view::AggregatedStream
//! [`Options`]: crate::graph::Options
//! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream
//! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
use crate::{
common::tracing_log_xt::log_xt_trace,
fork_aware_txpool::stream_map_util::next_event,
graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent},
fork_aware_txpool::{stream_map_util::next_event, view::TransactionStatusEvent},
graph::{self, BlockHash, ExtrinsicHash},
LOG_TARGET,
};
use futures::{Future, FutureExt, Stream, StreamExt};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,9 @@ mod tests {
let api = Arc::new(TestApi::default());
let block0 = api.expect_hash_and_number(0);

let view = Arc::new(View::new(
api.clone(),
block0,
Default::default(),
Default::default(),
false.into(),
));
let view = Arc::new(
View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0,
);
let queue = Arc::new(RevalidationQueue::new());

let uxt = uxt(Transfer {
Expand Down
Loading

0 comments on commit 8507e70

Please sign in to comment.