From 3aca3482bffe2213b0e3785a925abb10b43d3b06 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Tue, 19 Dec 2023 14:41:27 +0100 Subject: [PATCH] Introduce EthSchedule.OrderedProcessor Signed-off-by: Fabio Di Fabio --- .../EthGetFilterChangesIntegrationTest.java | 3 +- .../ethereum/eth/manager/EthScheduler.java | 49 ++++++++++ .../eth/transactions/TransactionPool.java | 90 ++++--------------- 3 files changed, 69 insertions(+), 73 deletions(-) diff --git a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java index 0026c56ad848..49a2fbcd87b8 100644 --- a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java @@ -18,6 +18,7 @@ import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -107,7 +108,7 @@ public void setUp() { blockchain::getChainHeadHeader); final ProtocolContext protocolContext = executionContext.getProtocolContext(); - EthContext ethContext = mock(EthContext.class); + EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS); EthPeers ethPeers = mock(EthPeers.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java index 43f8d6a4c35b..61a01bf54f3f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java @@ -23,9 +23,11 @@ import java.time.Duration; import java.util.Collection; +import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -34,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -295,4 +299,49 @@ public void failAfterTimeout(final CompletableFuture promise, final Durat delay, unit); } + + public OrderedProcessor createOrderedProcessor(final Consumer processor) { + return new OrderedProcessor<>(processor); + } + + /** + * This class is a way to execute a set of tasks, one by one, in a strict order, without blocking + * the caller in case there are still previous tasks queued + * + * @param the class of item to be processed + */ + public class OrderedProcessor { + private final Queue blockAddedQueue = new ConcurrentLinkedQueue<>(); + private final ReentrantLock blockAddedLock = new ReentrantLock(); + private final Consumer processor; + + private OrderedProcessor(final Consumer processor) { + this.processor = processor; + } + + public void submit(final ITEM item) { + // add the item to the processing queue + blockAddedQueue.add(item); + + if (blockAddedLock.hasQueuedThreads()) { + // another thread is already waiting to process the queue with our item, there is no need to + // schedule another thread + LOG.trace( + "Block added event queue is already being processed and an already queued thread is present, nothing to do"); + } else { + servicesExecutor.submit( + () -> { + blockAddedLock.lock(); + try { + // now that we have the lock, process as many items as possible + for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) { + processor.accept(i); + } + } finally { + blockAddedLock.unlock(); + } + }); + } + } + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 5cb5bf82fa37..a3d164105d02 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams; import org.hyperledger.besu.ethereum.mainnet.TransactionValidator; @@ -53,7 +54,6 @@ import java.io.IOException; import java.math.BigInteger; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -62,11 +62,9 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; -import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -108,9 +106,7 @@ public class TransactionPool implements BlockAddedObserver { private volatile OptionalLong subscribeConnectId = OptionalLong.empty(); private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager(); private final Set
localSenders = ConcurrentHashMap.newKeySet(); - private final ReentrantLock blockAddedLock = new ReentrantLock(); - private final AtomicReference blockAddedQueuedThread = new AtomicReference<>(); - private final Queue blockAddedQueue = new ConcurrentLinkedQueue<>(); + private final EthScheduler.OrderedProcessor blockAddedEventOrderedProcessor; public TransactionPool( final Supplier pendingTransactionsSupplier, @@ -132,6 +128,8 @@ public TransactionPool( pluginTransactionValidatorFactory == null ? null : pluginTransactionValidatorFactory.create(); + this.blockAddedEventOrderedProcessor = + ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent); initLogForReplay(); } @@ -327,76 +325,24 @@ public void onBlockAdded(final BlockAddedEvent event) { if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED) || event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) { - // add the event to the processing queue - blockAddedQueue.add(event); - processBlockAddedQueue(); + blockAddedEventOrderedProcessor.submit(event); } } } - private void processBlockAddedQueue() { - // we want to process the added block asynchronously, - // but at the same time we must ensure that blocks are processed in order one at time - ethContext - .getScheduler() - .scheduleServiceTask( - () -> { - // if we were the thread waiting to be executed, then clear the queued thread - blockAddedQueuedThread.compareAndSet(Thread.currentThread(), null); - - int blockProcessed = 0; - if (!blockAddedQueue.isEmpty()) { - if (blockAddedLock.tryLock()) { - // no other thread is processing the queue, so start processing it - try { - for (BlockAddedEvent e = blockAddedQueue.poll(); - e != null; - e = blockAddedQueue.poll()) { - final long started = System.currentTimeMillis(); - pendingTransactions.manageBlockAdded( - e.getBlock().getHeader(), - e.getAddedTransactions(), - e.getRemovedTransactions(), - protocolSchedule - .getByBlockHeader(e.getBlock().getHeader()) - .getFeeMarket()); - reAddTransactions(e.getRemovedTransactions()); - LOG.atTrace() - .setMessage( - "Block added event {} processed in {}ms, block processed by this thread {}") - .addArgument(e) - .addArgument(() -> System.currentTimeMillis() - started) - .addArgument(++blockProcessed) - .log(); - } - } finally { - blockAddedLock.unlock(); - } - } else { - blockAddedQueuedThread.getAndUpdate( - qt -> { - if (qt == null) { - // if no queued thread, then try later - LOG.trace( - "Block added event queue already being processed, retry later, queue thread {}", - qt); - ethContext - .getScheduler() - .scheduleFutureTask( - this::processBlockAddedQueue, Duration.ofMillis(100)); - } else { - LOG.trace( - "Block added event queue already being processed and an already queued thread present {}, nothing to do", - qt); - } - // record ourselves as queued thread - return Thread.currentThread(); - }); - return null; - } - } - return null; - }); + private void processBlockAddedEvent(final BlockAddedEvent e) { + final long started = System.currentTimeMillis(); + pendingTransactions.manageBlockAdded( + e.getBlock().getHeader(), + e.getAddedTransactions(), + e.getRemovedTransactions(), + protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket()); + reAddTransactions(e.getRemovedTransactions()); + LOG.atTrace() + .setMessage("Block added event {} processed in {}ms") + .addArgument(e) + .addArgument(() -> System.currentTimeMillis() - started) + .log(); } private void reAddTransactions(final List reAddTransactions) {