Skip to content

Commit

Permalink
add tx listener logic to pool and sealer
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Dec 10, 2024
1 parent b309377 commit 8bd39cb
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 38 deletions.
2 changes: 1 addition & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async fn main() -> anyhow::Result<()> {
} else if let Some(block_time) = config.block_time {
BlockSealerMode::fixed_time(config.max_transactions, block_time)
} else {
BlockSealerMode::immediate(config.max_transactions)
BlockSealerMode::immediate(config.max_transactions, pool.add_tx_listener())
};
let block_sealer = BlockSealer::new(sealing_mode);

Expand Down
34 changes: 21 additions & 13 deletions crates/core/src/node/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::node::error::LoadStateError;
use crate::node::impersonate::{ImpersonationManager, ImpersonationState};
use crate::node::state::{StateV1, VersionedState};
use crate::node::time::{AdvanceTime, ReadTime, TimestampManager};
use crate::node::{BlockSealer, TxPool};
use crate::node::{BlockSealer, BlockSealerMode, TxPool};
use crate::{
bootloader_debug::{BootloaderDebug, BootloaderDebugTracer},
console_log::ConsoleLogHandler,
Expand Down Expand Up @@ -1109,14 +1109,16 @@ fn contract_address_from_tx_result(execution_result: &VmExecutionResultAndLogs)
impl<S: ForkSource + std::fmt::Debug + Clone> Default for InMemoryNode<S> {
fn default() -> Self {
let impersonation = ImpersonationManager::default();
let pool = TxPool::new(impersonation.clone());
let tx_listener = pool.add_tx_listener();
InMemoryNode::new(
None,
None,
&TestNodeConfig::default(),
TimestampManager::default(),
impersonation.clone(),
TxPool::new(impersonation),
BlockSealer::default(),
impersonation,
pool,
BlockSealer::new(BlockSealerMode::immediate(1000, tx_listener)),
)
}
}
Expand Down Expand Up @@ -1160,14 +1162,16 @@ impl<S: ForkSource + std::fmt::Debug + Clone> InMemoryNode<S> {
// TODO: Refactor InMemoryNode with a builder pattern
pub fn default_fork(fork: Option<ForkDetails>) -> Self {
let impersonation = ImpersonationManager::default();
let pool = TxPool::new(impersonation.clone());
let tx_listener = pool.add_tx_listener();
Self::new(
fork,
None,
&Default::default(),
TimestampManager::default(),
impersonation.clone(),
TxPool::new(impersonation),
BlockSealer::default(),
impersonation,
pool,
BlockSealer::new(BlockSealerMode::immediate(1000, tx_listener)),
)
}

Expand Down Expand Up @@ -2155,6 +2159,8 @@ mod tests {
raw_storage: external_storage.inner.read().unwrap().raw_storage.clone(),
};
let impersonation = ImpersonationManager::default();
let pool = TxPool::new(impersonation.clone());
let sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));
let node: InMemoryNode<testing::ExternalStorage> = InMemoryNode::new(
Some(ForkDetails {
fork_source: Box::new(mock_db),
Expand All @@ -2176,9 +2182,9 @@ mod tests {
None,
&Default::default(),
TimestampManager::default(),
impersonation.clone(),
TxPool::new(impersonation),
BlockSealer::default(),
impersonation,
pool,
sealer,
);

let tx = testing::TransactionBuilder::new().build();
Expand All @@ -2193,6 +2199,8 @@ mod tests {
#[tokio::test]
async fn test_transact_returns_data_in_built_in_without_security_mode() {
let impersonation = ImpersonationManager::default();
let pool = TxPool::new(impersonation.clone());
let sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));
let node = InMemoryNode::<HttpForkSource>::new(
None,
None,
Expand All @@ -2201,9 +2209,9 @@ mod tests {
..Default::default()
},
TimestampManager::default(),
impersonation.clone(),
TxPool::new(impersonation),
BlockSealer::default(),
impersonation,
pool,
sealer,
);

let private_key = K256PrivateKey::from_bytes(H256::repeat_byte(0xef)).unwrap();
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/node/in_memory_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,14 @@ impl<S: ForkSource + std::fmt::Debug + Clone + Send + Sync + 'static> InMemoryNo

pub fn set_immediate_sealing(&self, enable: bool) -> Result<()> {
if enable {
let listener = self.pool.add_tx_listener();
self.sealer.set_mode(BlockSealerMode::immediate(
self.inner
.read()
.map_err(|err| anyhow!("failed acquiring lock: {:?}", err))?
.config
.max_transactions,
listener,
))
} else {
self.sealer.set_mode(BlockSealerMode::Noop)
Expand Down Expand Up @@ -617,6 +619,7 @@ mod tests {
previous_states: Default::default(),
};
let pool = TxPool::new(impersonation.clone());
let sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));

let node = InMemoryNode::<HttpForkSource> {
inner: Arc::new(RwLock::new(old_inner)),
Expand All @@ -626,7 +629,7 @@ mod tests {
impersonation,
observability: None,
pool,
sealer: BlockSealer::default(),
sealer,
system_contracts: Default::default(),
};

Expand Down
44 changes: 42 additions & 2 deletions crates/core/src/node/pool.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
use crate::node::impersonate::ImpersonationManager;
use futures::channel::mpsc::{channel, Receiver, Sender};
use itertools::Itertools;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use zksync_types::l2::L2Tx;
use zksync_types::{Address, H256};

#[derive(Clone)]
pub struct TxPool {
inner: Arc<RwLock<Vec<L2Tx>>>,
/// Listeners for new transactions' hashes
tx_listeners: Arc<Mutex<Vec<Sender<H256>>>>,
pub(crate) impersonation: ImpersonationManager,
}

impl TxPool {
pub fn new(impersonation: ImpersonationManager) -> Self {
Self {
inner: Arc::new(RwLock::new(Vec::new())),
tx_listeners: Arc::new(Mutex::new(Vec::new())),
impersonation,
}
}

pub fn add_tx(&self, tx: L2Tx) {
let mut guard = self.inner.write().expect("TxPool lock is poisoned");
let hash = tx.hash();
guard.push(tx);
self.notify_listeners(hash);
}

pub fn add_txs(&self, txs: impl IntoIterator<Item = L2Tx>) {
let mut guard = self.inner.write().expect("TxPool lock is poisoned");
guard.extend(txs);
for tx in txs {
let hash = tx.hash();
guard.push(tx);
self.notify_listeners(hash);
}
}

/// Removes a single transaction from the pool
Expand Down Expand Up @@ -82,6 +92,36 @@ impl TxPool {
let txs = guard.drain(0..tx_count).collect();
Some(TxBatch { impersonating, txs })
}

/// Adds a new transaction listener to the pool that gets notified about every new transaction.
pub fn add_tx_listener(&self) -> Receiver<H256> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
self.tx_listeners
.lock()
.expect("TxPool lock is poisoned")
.push(tx);
rx
}

/// Notifies all listeners about the transaction.
fn notify_listeners(&self, tx_hash: H256) {
let mut tx_listeners = self.tx_listeners.lock().expect("TxPool lock is poisoned");
tx_listeners.retain_mut(|listener| match listener.try_send(tx_hash) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
tracing::warn!(
%tx_hash,
"Failed to send transaction notification because channel is full",
);
true
} else {
false
}
}
});
}
}

// Test utilities
Expand Down
62 changes: 41 additions & 21 deletions crates/core/src/node/sealer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::node::pool::{TxBatch, TxPool};
use futures::channel::mpsc::Receiver;
use futures::stream::{Fuse, StreamExt};
use futures::task::AtomicWaker;
use futures::Stream;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{Interval, MissedTickBehavior};
use zksync_types::H256;

#[derive(Clone, Debug)]
pub struct BlockSealer {
Expand All @@ -13,12 +18,6 @@ pub struct BlockSealer {
waker: Arc<AtomicWaker>,
}

impl Default for BlockSealer {
fn default() -> Self {
BlockSealer::new(BlockSealerMode::immediate(1000))
}
}

impl BlockSealer {
pub fn new(mode: BlockSealerMode) -> Self {
Self {
Expand All @@ -45,7 +44,7 @@ impl BlockSealer {
let mut mode = self.mode.write().expect("BlockSealer lock is poisoned");
match &mut *mode {
BlockSealerMode::Noop => Poll::Pending,
BlockSealerMode::Immediate(immediate) => immediate.poll(pool),
BlockSealerMode::Immediate(immediate) => immediate.poll(pool, cx),
BlockSealerMode::FixedTime(fixed) => fixed.poll(pool, cx),
}
}
Expand All @@ -67,8 +66,11 @@ impl BlockSealerMode {
Self::Noop
}

pub fn immediate(max_transactions: usize) -> Self {
Self::Immediate(ImmediateBlockSealer { max_transactions })
pub fn immediate(max_transactions: usize, listener: Receiver<H256>) -> Self {
Self::Immediate(ImmediateBlockSealer {
max_transactions,
rx: listener.fuse(),
})
}

pub fn fixed_time(max_transactions: usize, block_time: Duration) -> Self {
Expand All @@ -78,7 +80,7 @@ impl BlockSealerMode {
pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
match self {
BlockSealerMode::Noop => Poll::Pending,
BlockSealerMode::Immediate(immediate) => immediate.poll(pool),
BlockSealerMode::Immediate(immediate) => immediate.poll(pool, cx),
BlockSealerMode::FixedTime(fixed) => fixed.poll(pool, cx),
}
}
Expand All @@ -88,15 +90,28 @@ impl BlockSealerMode {
pub struct ImmediateBlockSealer {
/// Maximum number of transactions to include in a block.
max_transactions: usize,
/// Receives hashes of new transactions.
rx: Fuse<Receiver<H256>>,
}

impl ImmediateBlockSealer {
pub fn poll(&mut self, pool: &TxPool) -> Poll<TxBatch> {
let Some(tx_batch) = pool.take_uniform(self.max_transactions) else {
return Poll::Pending;
};

Poll::Ready(tx_batch)
pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
match pool.take_uniform(self.max_transactions) {
Some(tx_batch) => Poll::Ready(tx_batch),
None => {
let mut has_new_txs = false;
// Yield until new transactions are available in the pool
while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) {
has_new_txs = true;
}

if has_new_txs {
self.poll(pool, cx)
} else {
Poll::Pending
}
}
}
}
}

Expand Down Expand Up @@ -161,7 +176,8 @@ mod tests {
#[test]
fn immediate_empty() {
let pool = TxPool::new(ImpersonationManager::default());
let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000));
let mut block_sealer =
BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));
let waker = &WAKER_NOOP;
let mut cx = Context::from_waker(waker);

Expand All @@ -171,7 +187,8 @@ mod tests {
#[test]
fn immediate_one_tx() {
let pool = TxPool::new(ImpersonationManager::default());
let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000));
let mut block_sealer =
BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));
let waker = &WAKER_NOOP;
let mut cx = Context::from_waker(waker);

Expand All @@ -190,7 +207,8 @@ mod tests {
#[test]
fn immediate_several_txs() {
let pool = TxPool::new(ImpersonationManager::default());
let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000));
let mut block_sealer =
BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));
let waker = &WAKER_NOOP;
let mut cx = Context::from_waker(waker);

Expand All @@ -209,7 +227,8 @@ mod tests {
#[test]
fn immediate_respect_max_txs() {
let pool = TxPool::new(ImpersonationManager::default());
let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(3));
let mut block_sealer =
BlockSealer::new(BlockSealerMode::immediate(3, pool.add_tx_listener()));
let waker = &WAKER_NOOP;
let mut cx = Context::from_waker(waker);

Expand All @@ -229,7 +248,8 @@ mod tests {
#[test]
fn immediate_gradual_txs() {
let pool = TxPool::new(ImpersonationManager::default());
let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000));
let mut block_sealer =
BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener()));
let waker = &WAKER_NOOP;
let mut cx = Context::from_waker(waker);

Expand Down

0 comments on commit 8bd39cb

Please sign in to comment.