Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(op, txpool): add additional update routine for conditionals #14497

Merged
merged 6 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions crates/optimism/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl<T> Default for OpPoolBuilder<T> {
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + MaybeConditionalTransaction,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;

Expand Down Expand Up @@ -481,7 +481,7 @@ where
info!(target: "reth::cli", "Transaction pool initialized");
let transactions_path = data_dir.txpool_transactions();

// spawn txpool maintenance task
// spawn txpool maintenance tasks
{
let pool = transaction_pool.clone();
let chain_events = ctx.provider().canonical_state_stream();
Expand All @@ -500,18 +500,29 @@ where
},
);

// spawn the maintenance task
// spawn the main maintenance task
ctx.task_executor().spawn_critical(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
client,
pool,
pool.clone(),
chain_events,
ctx.task_executor().clone(),
Default::default(),
),
);
debug!(target: "reth::cli", "Spawned txpool maintenance task");

// spawn the Op txpool maintenance task
let chain_events = ctx.provider().canonical_state_stream();
ctx.task_executor().spawn_critical(
"Op txpool maintenance task",
reth_optimism_txpool::maintain::maintain_transaction_pool_future(
pool,
chain_events,
),
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this also has access to the enable conditional setting so we can spawn this based on that, I believe we can pass this setting down

debug!(target: "reth::cli", "Spawned Op extension txpool maintenance task");
}

Ok(transaction_pool)
Expand Down
2 changes: 2 additions & 0 deletions crates/optimism/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ alloy-rpc-types-eth.workspace = true
# reth
reth-chainspec.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-storage-api.workspace = true
reth-transaction-pool.workspace = true

Expand All @@ -37,6 +38,7 @@ reth-optimism-primitives = { workspace = true, features = ["reth-codec"] }
# misc
c-kzg.workspace = true
derive_more.workspace = true
futures-util.workspace = true
parking_lot.workspace = true

[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions crates/optimism/txpool/src/conditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub trait MaybeConditionalTransaction {
/// Attach a [`TransactionConditional`].
fn set_conditional(&mut self, conditional: TransactionConditional);

/// Get attached [`TransactionConditional`] if any.
fn conditional(&self) -> Option<&TransactionConditional>;

/// Helper that sets the conditional and returns the instance again
fn with_conditional(mut self, conditional: TransactionConditional) -> Self
where
Expand Down
1 change: 1 addition & 0 deletions crates/optimism/txpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use validator::{OpL1BlockInfo, OpTransactionValidator};
pub mod conditional;
mod transaction;
pub use transaction::OpPooledTransaction;
pub mod maintain;

use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};

Expand Down
58 changes: 58 additions & 0 deletions crates/optimism/txpool/src/maintain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Support for maintaining the state of the transaction pool

use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use reth_primitives_traits::NodePrimitives;
use reth_provider::CanonStateNotification;
use reth_transaction_pool::TransactionPool;

use crate::conditional::MaybeConditionalTransaction;

/// Returns a spawnable future for maintaining the state of the transaction pool.
pub fn maintain_transaction_pool_future<N, Pool, St>(
pool: Pool,
events: St,
) -> BoxFuture<'static, ()>
where
N: NodePrimitives,
Pool: TransactionPool + 'static,
Pool::Transaction: MaybeConditionalTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
async move {
maintain_transaction_pool(pool, events).await;
}
.boxed()
}

/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
where
N: NodePrimitives,
Pool: TransactionPool,
Pool::Transaction: MaybeConditionalTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
loop {
let Some(event) = events.next().await else { break };
if let CanonStateNotification::Commit { new } = event {
if !new.is_empty() {
let header = new.tip().clone().into_header();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I blieve there should be clone_header

let mut to_remove = Vec::new();
for tx in &pool.pooled_transactions() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fine, even with a low blocktime to run every 1-2s

@hai-rise is support for 4337 conditionals even relevant for you? if not you can just not spawn this task

if let Some(conditional) = tx.transaction.conditional() {
if conditional.has_exceeded_block_attributes(&BlockConditionalAttributes {
number: header.number(),
timestamp: header.timestamp(),
}) {
to_remove.push(*tx.hash());
}
}
}
let _ = pool.remove_transactions(to_remove);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also want some metrics for this

}
}
}
}
9 changes: 4 additions & 5 deletions crates/optimism/txpool/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ impl<Cons: SignedTransaction, Pooled> OpPooledTransaction<Cons, Pooled> {
self.conditional = Some(Box::new(conditional));
self
}

/// Conditional getter.
pub fn conditional(&self) -> Option<&TransactionConditional> {
self.conditional.as_deref()
}
}

impl<Cons, Pooled> MaybeConditionalTransaction for OpPooledTransaction<Cons, Pooled> {
fn set_conditional(&mut self, conditional: TransactionConditional) {
self.conditional = Some(Box::new(conditional))
}

fn conditional(&self) -> Option<&TransactionConditional> {
self.conditional.as_deref()
}
}

impl<Cons, Pooled> PoolTransaction for OpPooledTransaction<Cons, Pooled>
Expand Down
Loading