Skip to content

Commit e4c8e47

Browse files
fgimenezmattsse
andauthored
feat(op, txpool): add additional update routine for conditionals (#14497)
Co-authored-by: Matthias Seitz <[email protected]>
1 parent 9c1988b commit e4c8e47

File tree

7 files changed

+106
-11
lines changed

7 files changed

+106
-11
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/optimism/node/src/node.rs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ impl OpNode {
107107
self.args;
108108
ComponentsBuilder::default()
109109
.node_types::<Node>()
110-
.pool(OpPoolBuilder::default())
110+
.pool(
111+
OpPoolBuilder::default()
112+
.with_enable_tx_conditional(self.args.enable_tx_conditional),
113+
)
111114
.payload(
112115
OpPayloadBuilder::new(compute_pending_block).with_da_config(self.da_config.clone()),
113116
)
@@ -433,20 +436,33 @@ where
433436
pub struct OpPoolBuilder<T = crate::txpool::OpPooledTransaction> {
434437
/// Enforced overrides that are applied to the pool config.
435438
pub pool_config_overrides: PoolBuilderConfigOverrides,
439+
/// Enable transaction conditionals.
440+
pub enable_tx_conditional: bool,
436441
/// Marker for the pooled transaction type.
437442
_pd: core::marker::PhantomData<T>,
438443
}
439444

440445
impl<T> Default for OpPoolBuilder<T> {
441446
fn default() -> Self {
442-
Self { pool_config_overrides: Default::default(), _pd: Default::default() }
447+
Self {
448+
pool_config_overrides: Default::default(),
449+
enable_tx_conditional: false,
450+
_pd: Default::default(),
451+
}
452+
}
453+
}
454+
455+
impl<T> OpPoolBuilder<T> {
456+
fn with_enable_tx_conditional(mut self, enable_tx_conditional: bool) -> Self {
457+
self.enable_tx_conditional = enable_tx_conditional;
458+
self
443459
}
444460
}
445461

446462
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
447463
where
448464
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
449-
T: EthPoolTransaction<Consensus = TxTy<Node::Types>>,
465+
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + MaybeConditionalTransaction,
450466
{
451467
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;
452468

@@ -481,7 +497,7 @@ where
481497
info!(target: "reth::cli", "Transaction pool initialized");
482498
let transactions_path = data_dir.txpool_transactions();
483499

484-
// spawn txpool maintenance task
500+
// spawn txpool maintenance tasks
485501
{
486502
let pool = transaction_pool.clone();
487503
let chain_events = ctx.provider().canonical_state_stream();
@@ -500,18 +516,31 @@ where
500516
},
501517
);
502518

503-
// spawn the maintenance task
519+
// spawn the main maintenance task
504520
ctx.task_executor().spawn_critical(
505521
"txpool maintenance task",
506522
reth_transaction_pool::maintain::maintain_transaction_pool_future(
507523
client,
508-
pool,
524+
pool.clone(),
509525
chain_events,
510526
ctx.task_executor().clone(),
511527
Default::default(),
512528
),
513529
);
514530
debug!(target: "reth::cli", "Spawned txpool maintenance task");
531+
532+
if self.enable_tx_conditional {
533+
// spawn the Op txpool maintenance task
534+
let chain_events = ctx.provider().canonical_state_stream();
535+
ctx.task_executor().spawn_critical(
536+
"Op txpool maintenance task",
537+
reth_optimism_txpool::maintain::maintain_transaction_pool_future(
538+
pool,
539+
chain_events,
540+
),
541+
);
542+
debug!(target: "reth::cli", "Spawned Op txpool maintenance task");
543+
}
515544
}
516545

517546
Ok(transaction_pool)

crates/optimism/txpool/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ alloy-rpc-types-eth.workspace = true
2121
# reth
2222
reth-chainspec.workspace = true
2323
reth-primitives-traits.workspace = true
24+
reth-chain-state.workspace = true
2425
reth-storage-api.workspace = true
2526
reth-transaction-pool.workspace = true
2627

@@ -37,6 +38,7 @@ reth-optimism-primitives = { workspace = true, features = ["reth-codec"] }
3738
# misc
3839
c-kzg.workspace = true
3940
derive_more.workspace = true
41+
futures-util.workspace = true
4042
parking_lot.workspace = true
4143

4244
[dev-dependencies]

crates/optimism/txpool/src/conditional.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ pub trait MaybeConditionalTransaction {
77
/// Attach a [`TransactionConditional`].
88
fn set_conditional(&mut self, conditional: TransactionConditional);
99

10+
/// Get attached [`TransactionConditional`] if any.
11+
fn conditional(&self) -> Option<&TransactionConditional>;
12+
1013
/// Helper that sets the conditional and returns the instance again
1114
fn with_conditional(mut self, conditional: TransactionConditional) -> Self
1215
where

crates/optimism/txpool/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub use validator::{OpL1BlockInfo, OpTransactionValidator};
1616
pub mod conditional;
1717
mod transaction;
1818
pub use transaction::OpPooledTransaction;
19+
pub mod maintain;
1920

2021
use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};
2122

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//! Support for maintaining the state of the transaction pool
2+
3+
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
4+
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
5+
use reth_chain_state::CanonStateNotification;
6+
use reth_primitives_traits::NodePrimitives;
7+
use reth_transaction_pool::TransactionPool;
8+
9+
use crate::conditional::MaybeConditionalTransaction;
10+
11+
/// Returns a spawnable future for maintaining the state of the transaction pool.
12+
pub fn maintain_transaction_pool_future<N, Pool, St>(
13+
pool: Pool,
14+
events: St,
15+
) -> BoxFuture<'static, ()>
16+
where
17+
N: NodePrimitives,
18+
Pool: TransactionPool + 'static,
19+
Pool::Transaction: MaybeConditionalTransaction,
20+
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
21+
{
22+
async move {
23+
maintain_transaction_pool(pool, events).await;
24+
}
25+
.boxed()
26+
}
27+
28+
/// Maintains the state of the transaction pool by handling new blocks and reorgs.
29+
///
30+
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
31+
pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
32+
where
33+
N: NodePrimitives,
34+
Pool: TransactionPool,
35+
Pool::Transaction: MaybeConditionalTransaction,
36+
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
37+
{
38+
loop {
39+
let Some(event) = events.next().await else { break };
40+
if let CanonStateNotification::Commit { new } = event {
41+
if new.is_empty() {
42+
continue;
43+
}
44+
let block_attr = BlockConditionalAttributes {
45+
number: new.tip().number(),
46+
timestamp: new.tip().timestamp(),
47+
};
48+
let mut to_remove = Vec::new();
49+
for tx in &pool.pooled_transactions() {
50+
if let Some(conditional) = tx.transaction.conditional() {
51+
if conditional.has_exceeded_block_attributes(&block_attr) {
52+
to_remove.push(*tx.hash());
53+
}
54+
}
55+
}
56+
let _ = pool.remove_transactions(to_remove);
57+
}
58+
}
59+
}

crates/optimism/txpool/src/transaction.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,16 @@ impl<Cons: SignedTransaction, Pooled> OpPooledTransaction<Cons, Pooled> {
6060
self.conditional = Some(Box::new(conditional));
6161
self
6262
}
63-
64-
/// Conditional getter.
65-
pub fn conditional(&self) -> Option<&TransactionConditional> {
66-
self.conditional.as_deref()
67-
}
6863
}
6964

7065
impl<Cons, Pooled> MaybeConditionalTransaction for OpPooledTransaction<Cons, Pooled> {
7166
fn set_conditional(&mut self, conditional: TransactionConditional) {
7267
self.conditional = Some(Box::new(conditional))
7368
}
69+
70+
fn conditional(&self) -> Option<&TransactionConditional> {
71+
self.conditional.as_deref()
72+
}
7473
}
7574

7675
impl<Cons, Pooled> PoolTransaction for OpPooledTransaction<Cons, Pooled>

0 commit comments

Comments
 (0)