18
18
import io .optimism .utilities .rpc .Web3jProvider ;
19
19
import io .optimism .utilities .telemetry .Logging ;
20
20
import io .optimism .utilities .telemetry .TracerTaskWrapper ;
21
+ import io .reactivex .BackpressureStrategy ;
22
+ import io .reactivex .Flowable ;
21
23
import io .reactivex .disposables .Disposable ;
22
24
import java .math .BigInteger ;
23
25
import java .time .Duration ;
24
26
import java .util .*;
25
27
import java .util .concurrent .ExecutionException ;
28
+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
26
29
import java .util .concurrent .StructuredTaskScope ;
30
+ import java .util .concurrent .TimeUnit ;
27
31
import java .util .stream .Collectors ;
28
32
import org .apache .commons .collections4 .CollectionUtils ;
29
33
import org .apache .commons .lang3 .StringUtils ;
45
49
import org .web3j .protocol .core .methods .response .EthBlock ;
46
50
import org .web3j .protocol .core .methods .response .EthLog ;
47
51
import org .web3j .protocol .core .methods .response .EthLog .LogObject ;
48
- import org .web3j .protocol .core .methods .response .EthLog .LogResult ;
49
52
import org .web3j .protocol .websocket .events .NewHead ;
50
53
import org .web3j .tuples .generated .Tuple2 ;
51
54
import org .web3j .tuples .generated .Tuple3 ;
@@ -85,7 +88,7 @@ public class InnerWatcher extends AbstractExecutionThreadService {
85
88
*/
86
89
private final Web3j provider ;
87
90
88
- private final Web3j wsProvider ;
91
+ private Web3j wsProvider ;
89
92
90
93
/**
91
94
* Beacon blob fetcher to fetch the beacon blob from the L1 beacon endpoint.
@@ -156,6 +159,8 @@ public class InnerWatcher extends AbstractExecutionThreadService {
156
159
157
160
private boolean devnet = false ;
158
161
162
+ private ScheduledThreadPoolExecutor scheduledExecutorService ;
163
+
159
164
/**
160
165
* create a InnerWatcher instance.
161
166
*
@@ -168,7 +173,9 @@ public InnerWatcher(
168
173
Config config , MessagePassingQueue <BlockUpdate > queue , BigInteger l1StartBlock , BigInteger l2StartBlock ) {
169
174
this .config = config ;
170
175
this .provider = Web3jProvider .createClient (config .l1RpcUrl ());
171
- this .wsProvider = Web3jProvider .createClient (config .l1WsRpcUrl ());
176
+ if (StringUtils .isNotEmpty (config .l1WsRpcUrl ())) {
177
+ this .wsProvider = Web3jProvider .createClient (config .l1WsRpcUrl ());
178
+ }
172
179
this .beaconFetcher = new BeaconBlobFetcher (config .l1BeaconUrl (), config .l1BeaconArchiverUrl ());
173
180
this .l2StartBlock = l2StartBlock ;
174
181
this .devnet = config .devnet () != null && config .devnet ();
@@ -213,22 +220,49 @@ private void getMetadataFromL2(BigInteger l2StartBlock) {
213
220
}
214
221
215
222
private Disposable subscribeL1NewHeads () {
216
- this .l1HeadListener = this .wsProvider
217
- .newHeadsNotifications ()
218
- .subscribe (
219
- notification -> {
220
- NewHead header = notification .getParams ().getResult ();
221
- String hash = header .getHash ();
222
- BigInteger number = Numeric .toBigInt (header .getNumber ());
223
- String parentHash = header .getParentHash ();
224
- BigInteger time = Numeric .toBigInt (header .getTimestamp ());
225
- l1Head = new BlockInfo (hash , number , parentHash , time );
226
- },
227
- t -> {
228
- if (t instanceof WebsocketNotConnectedException ) {
229
- this .subscribeL1NewHeads ();
230
- }
231
- });
223
+ if (this .wsProvider != null ) {
224
+ this .l1HeadListener = this .wsProvider
225
+ .newHeadsNotifications ()
226
+ .subscribe (
227
+ notification -> {
228
+ NewHead header = notification .getParams ().getResult ();
229
+ String hash = header .getHash ();
230
+ BigInteger number = Numeric .toBigInt (header .getNumber ());
231
+ String parentHash = header .getParentHash ();
232
+ BigInteger time = Numeric .toBigInt (header .getTimestamp ());
233
+ l1Head = new BlockInfo (hash , number , parentHash , time );
234
+ },
235
+ t -> {
236
+ if (t instanceof WebsocketNotConnectedException ) {
237
+ this .subscribeL1NewHeads ();
238
+ }
239
+ });
240
+ } else {
241
+ this .scheduledExecutorService = new ScheduledThreadPoolExecutor (1 );
242
+ this .l1HeadListener = Flowable .create (
243
+ (subscriber ) -> {
244
+ this .scheduledExecutorService .scheduleAtFixedRate (
245
+ () -> {
246
+ EthBlock .Block block = null ;
247
+ try {
248
+ block = pollBlock (
249
+ this .provider , DefaultBlockParameterName .LATEST , false );
250
+ } catch (ExecutionException | InterruptedException e ) {
251
+ LOGGER .warn ("error while fetching L1 data for block" , e );
252
+ }
253
+ subscriber .onNext (block );
254
+ },
255
+ 0 ,
256
+ 12 ,
257
+ TimeUnit .SECONDS );
258
+ },
259
+ BackpressureStrategy .BUFFER )
260
+ .subscribe (notification -> {
261
+ EthBlock .Block block = (EthBlock .Block ) notification ;
262
+ l1Head = BlockInfo .from (block );
263
+ });
264
+ }
265
+
232
266
return this .l1HeadListener ;
233
267
}
234
268
@@ -381,30 +415,45 @@ private void putBlockUpdate(final BlockUpdate update) {
381
415
382
416
private void updateSystemConfig (BlockInfo l1BlockInfo ) throws ExecutionException , InterruptedException {
383
417
BigInteger preLastUpdateBlock = this .systemConfigUpdate .component1 ();
384
- if (preLastUpdateBlock .compareTo (this .currentBlock ) < 0 ) {
385
- BigInteger toBlock = preLastUpdateBlock .add (BigInteger .valueOf (1000L ));
418
+ if (preLastUpdateBlock .compareTo (this .currentBlock ) <= 0 ) {
419
+ BigInteger fromBlock = preLastUpdateBlock .equals (BigInteger .ZERO )
420
+ ? BigInteger .ZERO
421
+ : preLastUpdateBlock .add (BigInteger .ONE );
422
+ BigInteger toBlock = preLastUpdateBlock .add (BigInteger .valueOf (100L ));
386
423
LOGGER .debug (
387
424
"will get system update eth log: fromBlock={} -> toBlock={}; contract={}" ,
388
- preLastUpdateBlock . add ( BigInteger . ONE ) ,
425
+ fromBlock ,
389
426
toBlock ,
390
427
InnerWatcher .this .config .chainConfig ().systemConfigContract ());
391
428
EthLog updates = this .getLog (
392
- preLastUpdateBlock . add ( BigInteger . ONE ) ,
429
+ fromBlock ,
393
430
toBlock ,
394
431
InnerWatcher .this .config .chainConfig ().systemConfigContract (),
395
432
CONFIG_UPDATE_TOPIC );
396
433
397
434
if (updates .getLogs ().isEmpty ()) {
398
435
this .systemConfigUpdate = new Tuple2 <>(toBlock , null );
399
436
} else {
400
- LogResult <?> update = updates .getLogs ().getFirst ();
401
- BigInteger updateBlock = ((LogObject ) update ).getBlockNumber ();
402
- SystemConfigUpdate configUpdate = SystemConfigUpdate .tryFrom ((LogObject ) update );
403
- if (updateBlock == null ) {
437
+ BigInteger updateBlockNum = ((LogObject ) updates .getLogs ().getFirst ()).getBlockNumber ();
438
+ SystemConfig updatedConfig = this .systemConfig ;
439
+ boolean updated = false ;
440
+ for (int i = 0 ; i < updates .getLogs ().size (); i ++) {
441
+ LogObject update = (LogObject ) updates .getLogs ().get (i );
442
+ BigInteger updateBlock = update .getBlockNumber ();
443
+ if (updateBlock == null ) {
444
+ break ;
445
+ }
446
+ if (!updateBlock .equals (updateBlockNum )) {
447
+ break ;
448
+ }
449
+ SystemConfigUpdate configUpdate = SystemConfigUpdate .tryFrom (update );
450
+ updatedConfig = parseSystemConfigUpdate (updatedConfig , l1BlockInfo , configUpdate );
451
+ updated = true ;
452
+ }
453
+ if (!updated ) {
404
454
this .systemConfigUpdate = new Tuple2 <>(toBlock , null );
405
455
} else {
406
- SystemConfig updateSystemConfig = parseSystemConfigUpdate (l1BlockInfo , configUpdate );
407
- this .systemConfigUpdate = new Tuple2 <>(updateBlock , updateSystemConfig );
456
+ this .systemConfigUpdate = new Tuple2 <>(updateBlockNum , updatedConfig );
408
457
}
409
458
}
410
459
}
@@ -417,46 +466,47 @@ private void updateSystemConfig(BlockInfo l1BlockInfo) throws ExecutionException
417
466
}
418
467
}
419
468
420
- private Config .SystemConfig parseSystemConfigUpdate (BlockInfo l1BlockInfo , SystemConfigUpdate configUpdate ) {
469
+ private Config .SystemConfig parseSystemConfigUpdate (
470
+ SystemConfig lastSystemConfig , BlockInfo l1BlockInfo , SystemConfigUpdate configUpdate ) {
421
471
Config .SystemConfig updateSystemConfig = null ;
422
472
if (configUpdate instanceof SystemConfigUpdate .BatchSender ) {
423
473
updateSystemConfig = new Config .SystemConfig (
424
474
((SystemConfigUpdate .BatchSender ) configUpdate ).getAddress (),
425
- this . systemConfig .gasLimit (),
426
- this . systemConfig .l1FeeOverhead (),
427
- this . systemConfig .l1FeeScalar (),
428
- this . systemConfig .unsafeBlockSigner ());
475
+ lastSystemConfig .gasLimit (),
476
+ lastSystemConfig .l1FeeOverhead (),
477
+ lastSystemConfig .l1FeeScalar (),
478
+ lastSystemConfig .unsafeBlockSigner ());
429
479
} else if (configUpdate instanceof SystemConfigUpdate .Fees ) {
430
480
var ecotoneTime = this .config .chainConfig ().ecotoneTime ();
431
481
if (ecotoneTime .compareTo (BigInteger .ZERO ) > 0
432
482
&& l1BlockInfo .timestamp ().compareTo (ecotoneTime ) >= 0 ) {
433
483
updateSystemConfig = new Config .SystemConfig (
434
- this . systemConfig .batchSender (),
435
- this . systemConfig .gasLimit (),
484
+ lastSystemConfig .batchSender (),
485
+ lastSystemConfig .gasLimit (),
436
486
BigInteger .ZERO ,
437
487
((SystemConfigUpdate .Fees ) configUpdate ).getFeeScalar (),
438
- this . systemConfig .unsafeBlockSigner ());
488
+ lastSystemConfig .unsafeBlockSigner ());
439
489
} else {
440
490
updateSystemConfig = new Config .SystemConfig (
441
- this . systemConfig .batchSender (),
442
- this . systemConfig .gasLimit (),
491
+ lastSystemConfig .batchSender (),
492
+ lastSystemConfig .gasLimit (),
443
493
((SystemConfigUpdate .Fees ) configUpdate ).getFeeOverhead (),
444
494
((SystemConfigUpdate .Fees ) configUpdate ).getFeeScalar (),
445
- this . systemConfig .unsafeBlockSigner ());
495
+ lastSystemConfig .unsafeBlockSigner ());
446
496
}
447
497
} else if (configUpdate instanceof SystemConfigUpdate .GasLimit ) {
448
498
updateSystemConfig = new Config .SystemConfig (
449
- this . systemConfig .batchSender (),
499
+ lastSystemConfig .batchSender (),
450
500
((SystemConfigUpdate .GasLimit ) configUpdate ).getGas (),
451
- this . systemConfig .l1FeeOverhead (),
452
- this . systemConfig .l1FeeScalar (),
453
- this . systemConfig .unsafeBlockSigner ());
501
+ lastSystemConfig .l1FeeOverhead (),
502
+ lastSystemConfig .l1FeeScalar (),
503
+ lastSystemConfig .unsafeBlockSigner ());
454
504
} else if (configUpdate instanceof SystemConfigUpdate .UnsafeBlockSigner ) {
455
505
updateSystemConfig = new Config .SystemConfig (
456
- this . systemConfig .batchSender (),
457
- this . systemConfig .gasLimit (),
458
- this . systemConfig .l1FeeOverhead (),
459
- this . systemConfig .l1FeeScalar (),
506
+ lastSystemConfig .batchSender (),
507
+ lastSystemConfig .gasLimit (),
508
+ lastSystemConfig .l1FeeOverhead (),
509
+ lastSystemConfig .l1FeeScalar (),
460
510
((SystemConfigUpdate .UnsafeBlockSigner ) configUpdate ).getAddress ());
461
511
}
462
512
return updateSystemConfig ;
@@ -601,6 +651,12 @@ protected void shutDown() {
601
651
if (!this .l1HeadListener .isDisposed ()) {
602
652
this .l1HeadListener .dispose ();
603
653
}
654
+ if (this .wsProvider != null ) {
655
+ this .wsProvider .shutdown ();
656
+ }
657
+ if (this .scheduledExecutorService != null ) {
658
+ this .scheduledExecutorService .shutdown ();
659
+ }
604
660
}
605
661
606
662
@ Override
0 commit comments