Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(op, txpool): add additional update routine for conditionals #14497

Merged
merged 6 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 35 additions & 6 deletions crates/optimism/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ impl OpNode {
self.args;
ComponentsBuilder::default()
.node_types::<Node>()
.pool(OpPoolBuilder::default())
.pool(
OpPoolBuilder::default()
.with_enable_tx_conditional(self.args.enable_tx_conditional),
Comment on lines +111 to +112
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great that we can already propagate additional cli settings like this

)
.payload(
OpPayloadBuilder::new(compute_pending_block).with_da_config(self.da_config.clone()),
)
Expand Down Expand Up @@ -433,20 +436,33 @@ where
pub struct OpPoolBuilder<T = crate::txpool::OpPooledTransaction> {
/// Enforced overrides that are applied to the pool config.
pub pool_config_overrides: PoolBuilderConfigOverrides,
/// Enable transaction conditionals.
pub enable_tx_conditional: bool,
/// Marker for the pooled transaction type.
_pd: core::marker::PhantomData<T>,
}

impl<T> Default for OpPoolBuilder<T> {
fn default() -> Self {
Self { pool_config_overrides: Default::default(), _pd: Default::default() }
Self {
pool_config_overrides: Default::default(),
enable_tx_conditional: false,
_pd: Default::default(),
}
}
}

impl<T> OpPoolBuilder<T> {
fn with_enable_tx_conditional(mut self, enable_tx_conditional: bool) -> Self {
self.enable_tx_conditional = enable_tx_conditional;
self
}
}

impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + MaybeConditionalTransaction,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;

Expand Down Expand Up @@ -481,7 +497,7 @@ where
info!(target: "reth::cli", "Transaction pool initialized");
let transactions_path = data_dir.txpool_transactions();

// spawn txpool maintenance task
// spawn txpool maintenance tasks
{
let pool = transaction_pool.clone();
let chain_events = ctx.provider().canonical_state_stream();
Expand All @@ -500,18 +516,31 @@ where
},
);

// spawn the maintenance task
// spawn the main maintenance task
ctx.task_executor().spawn_critical(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
client,
pool,
pool.clone(),
chain_events,
ctx.task_executor().clone(),
Default::default(),
),
);
debug!(target: "reth::cli", "Spawned txpool maintenance task");

if self.enable_tx_conditional {
// spawn the Op txpool maintenance task
let chain_events = ctx.provider().canonical_state_stream();
ctx.task_executor().spawn_critical(
"Op txpool maintenance task",
reth_optimism_txpool::maintain::maintain_transaction_pool_future(
pool,
chain_events,
),
);
debug!(target: "reth::cli", "Spawned Op txpool maintenance task");
}
}

Ok(transaction_pool)
Expand Down
2 changes: 2 additions & 0 deletions crates/optimism/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ alloy-rpc-types-eth.workspace = true
# reth
reth-chainspec.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-storage-api.workspace = true
reth-transaction-pool.workspace = true

Expand All @@ -37,6 +38,7 @@ reth-optimism-primitives = { workspace = true, features = ["reth-codec"] }
# misc
c-kzg.workspace = true
derive_more.workspace = true
futures-util.workspace = true
parking_lot.workspace = true

[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions crates/optimism/txpool/src/conditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub trait MaybeConditionalTransaction {
/// Attach a [`TransactionConditional`].
fn set_conditional(&mut self, conditional: TransactionConditional);

/// Get attached [`TransactionConditional`] if any.
fn conditional(&self) -> Option<&TransactionConditional>;

/// Helper that sets the conditional and returns the instance again
fn with_conditional(mut self, conditional: TransactionConditional) -> Self
where
Expand Down
1 change: 1 addition & 0 deletions crates/optimism/txpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use validator::{OpL1BlockInfo, OpTransactionValidator};
pub mod conditional;
mod transaction;
pub use transaction::OpPooledTransaction;
pub mod maintain;

use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};

Expand Down
59 changes: 59 additions & 0 deletions crates/optimism/txpool/src/maintain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Support for maintaining the state of the transaction pool

use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_primitives_traits::NodePrimitives;
use reth_transaction_pool::TransactionPool;

use crate::conditional::MaybeConditionalTransaction;

/// Returns a spawnable future for maintaining the state of the transaction pool.
pub fn maintain_transaction_pool_future<N, Pool, St>(
pool: Pool,
events: St,
) -> BoxFuture<'static, ()>
where
N: NodePrimitives,
Pool: TransactionPool + 'static,
Pool::Transaction: MaybeConditionalTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
async move {
maintain_transaction_pool(pool, events).await;
}
.boxed()
}

/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
where
N: NodePrimitives,
Pool: TransactionPool,
Pool::Transaction: MaybeConditionalTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
loop {
let Some(event) = events.next().await else { break };
if let CanonStateNotification::Commit { new } = event {
if new.is_empty() {
continue;
}
let block_attr = BlockConditionalAttributes {
number: new.tip().number(),
timestamp: new.tip().timestamp(),
};
let mut to_remove = Vec::new();
for tx in &pool.pooled_transactions() {
if let Some(conditional) = tx.transaction.conditional() {
if conditional.has_exceeded_block_attributes(&block_attr) {
to_remove.push(*tx.hash());
}
}
}
let _ = pool.remove_transactions(to_remove);
}
}
}
9 changes: 4 additions & 5 deletions crates/optimism/txpool/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ impl<Cons: SignedTransaction, Pooled> OpPooledTransaction<Cons, Pooled> {
self.conditional = Some(Box::new(conditional));
self
}

/// Conditional getter.
pub fn conditional(&self) -> Option<&TransactionConditional> {
self.conditional.as_deref()
}
}

impl<Cons, Pooled> MaybeConditionalTransaction for OpPooledTransaction<Cons, Pooled> {
fn set_conditional(&mut self, conditional: TransactionConditional) {
self.conditional = Some(Box::new(conditional))
}

fn conditional(&self) -> Option<&TransactionConditional> {
self.conditional.as_deref()
}
}

impl<Cons, Pooled> PoolTransaction for OpPooledTransaction<Cons, Pooled>
Expand Down
Loading