|
26 | 26 |
|
27 | 27 | import java.util.Comparator;
|
28 | 28 | import java.util.Optional;
|
| 29 | +import java.util.concurrent.Executors; |
| 30 | +import java.util.concurrent.ScheduledExecutorService; |
| 31 | +import java.util.concurrent.ScheduledFuture; |
| 32 | +import java.util.concurrent.TimeUnit; |
| 33 | +import java.util.concurrent.TimeoutException; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
29 | 35 | import java.util.concurrent.atomic.AtomicLong;
|
30 | 36 | import java.util.function.Consumer;
|
31 | 37 | import java.util.stream.Stream;
|
|
40 | 46 | public class TrieLogPruner implements TrieLogEvent.TrieLogObserver {
|
41 | 47 |
|
42 | 48 | private static final Logger LOG = LoggerFactory.getLogger(TrieLogPruner.class);
|
| 49 | + private static final int PRELOAD_TIMEOUT_IN_SECONDS = 30; |
43 | 50 |
|
44 | 51 | private final int pruningLimit;
|
45 | 52 | private final int loadingLimit;
|
@@ -83,38 +90,82 @@ public TrieLogPruner(
|
83 | 90 | BesuMetricCategory.PRUNER, "trie_log_pruned_orphan", "trie log pruned orphan");
|
84 | 91 | }
|
85 | 92 |
|
86 |
| - public int initialize() { |
87 |
| - return preloadQueue(); |
| 93 | + public void initialize() { |
| 94 | + preloadQueueWithTimeout(); |
88 | 95 | }
|
89 | 96 |
|
90 |
| - private int preloadQueue() { |
| 97 | + private void preloadQueueWithTimeout() { |
| 98 | + |
91 | 99 | LOG.atInfo()
|
92 |
| - .setMessage("Loading first {} trie logs from database...") |
| 100 | + .setMessage("Attempting to load first {} trie logs from database...") |
93 | 101 | .addArgument(loadingLimit)
|
94 | 102 | .log();
|
| 103 | + |
| 104 | + try (final ScheduledExecutorService preloadExecutor = Executors.newScheduledThreadPool(1)) { |
| 105 | + |
| 106 | + final AtomicBoolean timeoutOccurred = new AtomicBoolean(false); |
| 107 | + final Runnable timeoutTask = |
| 108 | + () -> { |
| 109 | + timeoutOccurred.set(true); |
| 110 | + LOG.atWarn() |
| 111 | + .setMessage( |
| 112 | + "Timeout occurred while loading and processing {} trie logs from database") |
| 113 | + .addArgument(loadingLimit) |
| 114 | + .log(); |
| 115 | + }; |
| 116 | + |
| 117 | + final ScheduledFuture<?> timeoutFuture = |
| 118 | + preloadExecutor.schedule(timeoutTask, PRELOAD_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); |
| 119 | + LOG.atInfo() |
| 120 | + .setMessage( |
| 121 | + "Trie log pruning will timeout after {} seconds. If this is timing out, consider using `besu storage trie-log prune` subcommand, see https://besu.hyperledger.org/public-networks/how-to/bonsai-limit-trie-logs") |
| 122 | + .addArgument(PRELOAD_TIMEOUT_IN_SECONDS) |
| 123 | + .log(); |
| 124 | + |
| 125 | + preloadQueue(timeoutOccurred, timeoutFuture); |
| 126 | + } |
| 127 | + } |
| 128 | + |
| 129 | + private void preloadQueue( |
| 130 | + final AtomicBoolean timeoutOccurred, final ScheduledFuture<?> timeoutFuture) { |
| 131 | + |
95 | 132 | try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {
|
96 |
| - final AtomicLong count = new AtomicLong(); |
| 133 | + |
| 134 | + final AtomicLong addToPruneQueueCount = new AtomicLong(); |
97 | 135 | final AtomicLong orphansPruned = new AtomicLong();
|
98 | 136 | trieLogKeys.forEach(
|
99 | 137 | blockHashAsBytes -> {
|
| 138 | + if (timeoutOccurred.get()) { |
| 139 | + throw new RuntimeException( |
| 140 | + new TimeoutException("Timeout occurred while preloading trie log prune queue")); |
| 141 | + } |
100 | 142 | final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
|
101 | 143 | final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
|
102 | 144 | if (header.isPresent()) {
|
103 | 145 | addToPruneQueue(header.get().getNumber(), blockHash);
|
104 |
| - count.getAndIncrement(); |
| 146 | + addToPruneQueueCount.getAndIncrement(); |
105 | 147 | } else {
|
106 | 148 | // prune orphaned blocks (sometimes created during block production)
|
107 | 149 | rootWorldStateStorage.pruneTrieLog(blockHash);
|
108 | 150 | orphansPruned.getAndIncrement();
|
109 | 151 | prunedOrphanCounter.inc();
|
110 | 152 | }
|
111 | 153 | });
|
| 154 | + |
| 155 | + timeoutFuture.cancel(true); |
112 | 156 | LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue());
|
113 |
| - LOG.atInfo().log("Loaded {} trie logs from database", count); |
114 |
| - return pruneFromQueue() + orphansPruned.intValue(); |
| 157 | + LOG.atInfo().log( |
| 158 | + "Added {} trie logs to prune queue. Commencing pruning of eligible trie logs...", |
| 159 | + addToPruneQueueCount.intValue()); |
| 160 | + int prunedCount = pruneFromQueue(); |
| 161 | + LOG.atInfo().log("Pruned {} trie logs.", prunedCount); |
115 | 162 | } catch (Exception e) {
|
116 |
| - LOG.error("Error loading trie logs from database, nothing pruned", e); |
117 |
| - return 0; |
| 163 | + if (e.getCause() != null && e.getCause() instanceof TimeoutException) { |
| 164 | + int prunedCount = pruneFromQueue(); |
| 165 | + LOG.atInfo().log("Operation timed out, but still pruned {} trie logs.", prunedCount); |
| 166 | + } else { |
| 167 | + LOG.error("Error loading trie logs from database, nothing pruned", e); |
| 168 | + } |
118 | 169 | }
|
119 | 170 | }
|
120 | 171 |
|
|
0 commit comments