|
18 | 18 | import io.optimism.utilities.rpc.Web3jProvider;
|
19 | 19 | import io.optimism.utilities.telemetry.Logging;
|
20 | 20 | import io.optimism.utilities.telemetry.TracerTaskWrapper;
|
21 |
| -import io.reactivex.BackpressureStrategy; |
22 |
| -import io.reactivex.Flowable; |
23 | 21 | import io.reactivex.disposables.Disposable;
|
24 | 22 | import java.math.BigInteger;
|
25 | 23 | import java.time.Duration;
|
26 | 24 | import java.util.*;
|
27 | 25 | import java.util.concurrent.ExecutionException;
|
28 |
| -import java.util.concurrent.ScheduledThreadPoolExecutor; |
29 | 26 | import java.util.concurrent.StructuredTaskScope;
|
30 |
| -import java.util.concurrent.TimeUnit; |
31 | 27 | import java.util.stream.Collectors;
|
32 | 28 | import org.apache.commons.collections4.CollectionUtils;
|
33 | 29 | import org.apache.commons.lang3.StringUtils;
|
@@ -159,8 +155,6 @@ public class InnerWatcher extends AbstractExecutionThreadService {
|
159 | 155 |
|
160 | 156 | private boolean devnet = false;
|
161 | 157 |
|
162 |
| - private ScheduledThreadPoolExecutor scheduledExecutorService; |
163 |
| - |
164 | 158 | /**
|
165 | 159 | * create a InnerWatcher instance.
|
166 | 160 | *
|
@@ -219,51 +213,23 @@ private void getMetadataFromL2(BigInteger l2StartBlock) {
|
219 | 213 | }
|
220 | 214 | }
|
221 | 215 |
|
222 |
| - private Disposable subscribeL1NewHeads() { |
223 |
| - if (this.wsProvider != null) { |
224 |
| - this.l1HeadListener = this.wsProvider |
225 |
| - .newHeadsNotifications() |
226 |
| - .subscribe( |
227 |
| - notification -> { |
228 |
| - NewHead header = notification.getParams().getResult(); |
229 |
| - String hash = header.getHash(); |
230 |
| - BigInteger number = Numeric.toBigInt(header.getNumber()); |
231 |
| - String parentHash = header.getParentHash(); |
232 |
| - BigInteger time = Numeric.toBigInt(header.getTimestamp()); |
233 |
| - l1Head = new BlockInfo(hash, number, parentHash, time); |
234 |
| - }, |
235 |
| - t -> { |
236 |
| - if (t instanceof WebsocketNotConnectedException) { |
237 |
| - this.subscribeL1NewHeads(); |
238 |
| - } |
239 |
| - }); |
240 |
| - } else { |
241 |
| - this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1); |
242 |
| - this.l1HeadListener = Flowable.create( |
243 |
| - (subscriber) -> { |
244 |
| - this.scheduledExecutorService.scheduleAtFixedRate( |
245 |
| - () -> { |
246 |
| - EthBlock.Block block = null; |
247 |
| - try { |
248 |
| - block = pollBlock( |
249 |
| - this.provider, DefaultBlockParameterName.LATEST, false); |
250 |
| - } catch (ExecutionException | InterruptedException e) { |
251 |
| - LOGGER.warn("error while fetching L1 data for block", e); |
252 |
| - } |
253 |
| - subscriber.onNext(block); |
254 |
| - }, |
255 |
| - 0, |
256 |
| - 12, |
257 |
| - TimeUnit.SECONDS); |
258 |
| - }, |
259 |
| - BackpressureStrategy.BUFFER) |
260 |
| - .subscribe(notification -> { |
261 |
| - EthBlock.Block block = (EthBlock.Block) notification; |
262 |
| - l1Head = BlockInfo.from(block); |
263 |
| - }); |
264 |
| - } |
265 |
| - |
266 |
| - return this.l1HeadListener; |
| 216 | + private void subscribeL1NewHeads() { |
| 217 | + this.l1HeadListener = this.wsProvider |
| 218 | + .newHeadsNotifications() |
| 219 | + .subscribe( |
| 220 | + notification -> { |
| 221 | + NewHead header = notification.getParams().getResult(); |
| 222 | + String hash = header.getHash(); |
| 223 | + BigInteger number = Numeric.toBigInt(header.getNumber()); |
| 224 | + String parentHash = header.getParentHash(); |
| 225 | + BigInteger time = Numeric.toBigInt(header.getTimestamp()); |
| 226 | + l1Head = new BlockInfo(hash, number, parentHash, time); |
| 227 | + }, |
| 228 | + t -> { |
| 229 | + if (t instanceof WebsocketNotConnectedException) { |
| 230 | + this.subscribeL1NewHeads(); |
| 231 | + } |
| 232 | + }); |
267 | 233 | }
|
268 | 234 |
|
269 | 235 | /**
|
@@ -654,9 +620,6 @@ protected void shutDown() {
|
654 | 620 | if (this.wsProvider != null) {
|
655 | 621 | this.wsProvider.shutdown();
|
656 | 622 | }
|
657 |
| - if (this.scheduledExecutorService != null) { |
658 |
| - this.scheduledExecutorService.shutdown(); |
659 |
| - } |
660 | 623 | }
|
661 | 624 |
|
662 | 625 | @Override
|
|
0 commit comments