diff --git a/Makefile b/Makefile index f09aeed2b..aed9c263f 100644 --- a/Makefile +++ b/Makefile @@ -36,17 +36,22 @@ start-env: L1_CONTRACT_VERSION:=6 start-env: SKIP_CONTRACTS_DEPLOYMENT:=false start-env: LINEA_PROTOCOL_CONTRACTS_ONLY:=false start-env: - if [ "$(CLEAN_PREVIOUS_ENV)" = "true" ]; then \ - make clean-environment; \ + @if [ "$(CLEAN_PREVIOUS_ENV)" = "true" ]; then \ + $(MAKE) clean-environment; \ else \ echo "Starting stack reusing previous state"; \ fi; \ mkdir -p tmp/local; \ L1_GENESIS_TIME=$(get_future_time) COMPOSE_PROFILES=$(COMPOSE_PROFILES) docker compose -f $(COMPOSE_FILE) up -d; \ + while [ "$$(docker compose -f $(COMPOSE_FILE) ps -q l1-el-node | xargs docker inspect -f '{{.State.Health.Status}}')" != "healthy" ] || \ + [ "$$(docker compose -f $(COMPOSE_FILE) ps -q sequencer | xargs docker inspect -f '{{.State.Health.Status}}')" != "healthy" ]; do \ + sleep 2; \ + echo "Checking health status of l1-el-node and sequencer..."; \ + done if [ "$(SKIP_CONTRACTS_DEPLOYMENT)" = "true" ]; then \ echo "Skipping contracts deployment"; \ else \ - make deploy-contracts L1_CONTRACT_VERSION=$(L1_CONTRACT_VERSION) LINEA_PROTOCOL_CONTRACTS_ONLY=$(LINEA_PROTOCOL_CONTRACTS_ONLY); \ + $(MAKE) deploy-contracts L1_CONTRACT_VERSION=$(L1_CONTRACT_VERSION) LINEA_PROTOCOL_CONTRACTS_ONLY=$(LINEA_PROTOCOL_CONTRACTS_ONLY); \ fi start-l1: diff --git a/buildSrc/src/main/groovy/net.consensys.zkevm.kotlin-application-conventions.gradle b/buildSrc/src/main/groovy/net.consensys.zkevm.kotlin-application-conventions.gradle index e76b16c1d..73ec1b9d3 100644 --- a/buildSrc/src/main/groovy/net.consensys.zkevm.kotlin-application-conventions.gradle +++ b/buildSrc/src/main/groovy/net.consensys.zkevm.kotlin-application-conventions.gradle @@ -9,3 +9,14 @@ plugins { // Apply the application plugin to add support for building a CLI application in Java. id 'application' } + +tasks.distTar { + // we don't use the tar distribution + onlyIf { false } +} + +tasks.distZip { + // we only need the zip distribution to build the docker image + // we explicitly call distZip to build the zip distribution + enabled = false +} diff --git a/coordinator/app/build.gradle b/coordinator/app/build.gradle index 241fec9fe..94d577193 100644 --- a/coordinator/app/build.gradle +++ b/coordinator/app/build.gradle @@ -92,11 +92,6 @@ distributions { } } -tasks.distTar { - // we don't need the tar distribution - onlyIf { false } -} - run { workingDir = rootProject.projectDir jvmArgs = [ diff --git a/docker/compose-spec-l2-services.yml b/docker/compose-spec-l2-services.yml index 95fcd4ce6..13af5aba7 100644 --- a/docker/compose-spec-l2-services.yml +++ b/docker/compose-spec-l2-services.yml @@ -513,13 +513,13 @@ services: blobscan-api: container_name: blobscan-api hostname: blobscan-api - image: blossomlabs/blobscan-api:1.1.0 + image: blossomlabs/blobscan-api:1.3.1 platform: linux/amd64 # only linux available profiles: [ "staterecovery" ] ports: - "4001:4001" env_file: "./config/blobscan/env" - restart: no + restart: always # healthcheck: # test: [ "CMD", "curl", "-f", "http://localhost:4001/healthcheck" ] # disable: true @@ -528,7 +528,6 @@ services: # retries: 20 # start_period: 5s networks: - linea: l1network: ipv4_address: 10.10.10.203 depends_on: @@ -549,7 +548,6 @@ services: profiles: [ "staterecovery" ] env_file: "./config/blobscan/env" networks: - linea: l1network: ipv4_address: 10.10.10.204 restart: always diff --git a/docker/config/blobscan/env b/docker/config/blobscan/env index 6a57973e2..1c39ec098 100644 --- a/docker/config/blobscan/env +++ b/docker/config/blobscan/env @@ -22,7 +22,7 @@ BLOBSCAN_API_PORT=4001 EXTERNAL_API_PORT=4001 CHAIN_ID=31648428 DENCUN_FORK_SLOT=0 -LOG_LEVEL=debug +LOG_LEVEL=warn REDIS_URI=redis://redis:6379/1 # SENTRY_DSN_API= BLOB_PROPAGATOR_ENABLED=false diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2291c4b1d..3a68adf8e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -19,7 +19,6 @@ restassured = "5.3.0" wiremock = "3.0.1" # Runtime -besu = "24.12.2" blobCompressor = "0.0.4" blobShnarfCalculator = "0.0.4" bouncycastle = "1.79" diff --git a/jvm-libs/linea/besu-libs/build.gradle b/jvm-libs/linea/besu-libs/build.gradle index c9cb18675..99c4f39ae 100644 --- a/jvm-libs/linea/besu-libs/build.gradle +++ b/jvm-libs/linea/besu-libs/build.gradle @@ -3,10 +3,8 @@ plugins { id 'java-library' } -//def besuArtifactGroup="org.hyperledger.besu" -//def besuVersion=libs.versions.besu.get() def besuArtifactGroup="io.consensys.linea-besu" -def besuVersion="25.2-delivery46" +def besuVersion="25.2-delivery48" dependencies { api("${besuArtifactGroup}:besu-datatypes:${besuVersion}") { diff --git a/jvm-libs/linea/testing/l1-blob-and-proof-submission/src/main/kotlin/net/consensys/linea/testing/submission/SubmissionTestHelper.kt b/jvm-libs/linea/testing/l1-blob-and-proof-submission/src/main/kotlin/net/consensys/linea/testing/submission/SubmissionTestHelper.kt index 6ea454814..a69e620e7 100644 --- a/jvm-libs/linea/testing/l1-blob-and-proof-submission/src/main/kotlin/net/consensys/linea/testing/submission/SubmissionTestHelper.kt +++ b/jvm-libs/linea/testing/l1-blob-and-proof-submission/src/main/kotlin/net/consensys/linea/testing/submission/SubmissionTestHelper.kt @@ -12,6 +12,7 @@ import org.web3j.protocol.Web3j import tech.pegasys.teku.infrastructure.async.SafeFuture import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds fun assertTxSuccess( txHash: String, @@ -24,6 +25,7 @@ fun assertTxSuccess( txHash = txHash, timeout = timeout ).also { txReceipt -> + println("txHash=$txHash, receiptStatus=${txReceipt.status}") assertThat(txReceipt.status) .withFailMessage( "submission of $submissionType=${interval.intervalString()}" + @@ -37,11 +39,14 @@ fun assertTxsSuccess( txsAndInterval: List>, submissionType: String, l1Web3jClient: Web3j, - timeout: Duration = 1.minutes + timeout: Duration = 1.minutes, + log: Logger = LogManager.getLogger("linea.testing.submission") ) { SafeFuture.supplyAsync { txsAndInterval.forEach { (txHash, interval) -> + log.info("waiting for tx={} to be mined", txHash) assertTxSuccess(txHash, interval, submissionType, l1Web3jClient, timeout) + log.info("waiting for tx={} to be mined", txHash) } } .get(timeout.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS) @@ -55,6 +60,8 @@ fun submitBlobs( contractClient: LineaRollupSmartContractClient, aggregationsAndBlobs: List, blobChunksSize: Int = 6, + awaitForPreviousTxBeforeSubmittingNext: Boolean = false, + l1Web3jClient: Web3j, log: Logger ): List>> { require(blobChunksSize in 1..6) { "blobChunksSize must be between 1..6" } @@ -64,12 +71,18 @@ fun submitBlobs( val blobChunks = aggBlobs.chunked(blobChunksSize) blobChunks.map { blobs -> val txHash = contractClient.submitBlobs(blobs, gasPriceCaps = null).get() + val blobsLogInfo = blobs.map(BlockInterval::intervalString) log.info( "submitting blobs: aggregation={} blobsChunk={} txHash={}", agg?.intervalString(), - blobs.map { it.intervalString() }, + blobsLogInfo, txHash ) + if (awaitForPreviousTxBeforeSubmittingNext) { + log.debug("waiting for blobsChunk={} txHash={} to be mined", blobsLogInfo, txHash) + assertTxSuccess(txHash, blobs.first(), "blobs", l1Web3jClient, 20.seconds) + log.info(" blobsChunk={} txHash={} mined", blobsLogInfo, txHash) + } txHash to blobs } @@ -86,35 +99,44 @@ fun submitBlobsAndAggregationsAndWaitExecution( waitTimeout: Duration = 2.minutes, log: Logger = LogManager.getLogger("linea.testing.submission") ) { - val blobSubmissionTxHashes = submitBlobs( + val blobSubmissions = submitBlobs( contractClientForBlobSubmission, aggregationsAndBlobs, blobChunksMaxSize, - log + awaitForPreviousTxBeforeSubmittingNext = false, + l1Web3jClient = l1Web3jClient, + log = log ) assertTxsSuccess( - txsAndInterval = blobSubmissionTxHashes.map { (txHash, blobs) -> + txsAndInterval = blobSubmissions.map { (txHash, blobs) -> txHash to BlockInterval(blobs.first().startBlockNumber, blobs.last().endBlockNumber) }, submissionType = "blobs", l1Web3jClient = l1Web3jClient, timeout = waitTimeout ) + log.info("blob={} txHash={} executed on L1", blobSubmissions.last().second, blobSubmissions.last().first) val submissions = aggregationsAndBlobs .filter { it.aggregation != null } .mapIndexed { index, (aggregation, aggBlobs) -> aggregation as Aggregation val parentAgg = aggregationsAndBlobs.getOrNull(index - 1)?.aggregation - contractClientForAggregationSubmission.finalizeBlocks( + val txHash = contractClientForAggregationSubmission.finalizeBlocks( aggregation = aggregation.aggregationProof!!, aggregationLastBlob = aggBlobs.last(), parentShnarf = aggBlobs.first().blobCompressionProof!!.prevShnarf, parentL1RollingHash = parentAgg?.aggregationProof?.l1RollingHash ?: ByteArray(32), parentL1RollingHashMessageNumber = parentAgg?.aggregationProof?.l1RollingHashMessageNumber ?: 0L, gasPriceCaps = null - ).get() to aggregation + ).get() + log.info( + "submitting aggregation={} txHash={}", + aggregation.intervalString(), + txHash + ) + txHash to aggregation } assertTxsSuccess( @@ -123,4 +145,10 @@ fun submitBlobsAndAggregationsAndWaitExecution( l1Web3jClient = l1Web3jClient, timeout = waitTimeout ) + + log.info( + "aggregation={} txHash={} executed on L1", + submissions.last().second.intervalString(), + submissions.last().first + ) } diff --git a/jvm-libs/linea/web3j-extensions/src/main/kotlin/linea/web3j/Web3JLogsSearcher.kt b/jvm-libs/linea/web3j-extensions/src/main/kotlin/linea/web3j/Web3JLogsSearcher.kt index d13dbcf26..1d08ce1e4 100644 --- a/jvm-libs/linea/web3j-extensions/src/main/kotlin/linea/web3j/Web3JLogsSearcher.kt +++ b/jvm-libs/linea/web3j-extensions/src/main/kotlin/linea/web3j/Web3JLogsSearcher.kt @@ -19,6 +19,7 @@ import org.web3j.protocol.core.methods.request.EthFilter import org.web3j.protocol.core.methods.response.EthLog import org.web3j.protocol.core.methods.response.Log import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.util.concurrent.atomic.AtomicReference import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds @@ -73,22 +74,32 @@ class Web3JLogsSearcher( val cursor = SearchCursor(fromBlock, toBlock, chunkSize) log.trace("searching between blocks={}", CommonDomainFunctions.blockIntervalString(fromBlock, toBlock)) - var nextChunkToSearch: Pair? = cursor.next(searchDirection = SearchDirection.FORWARD) + val nextChunkToSearchRef: AtomicReference?> = + AtomicReference(cursor.next(searchDirection = SearchDirection.FORWARD)) return AsyncRetryer.retry( vertx, backoffDelay = config.backoffDelay, - stopRetriesPredicate = { it is SearchResult.ItemFound || nextChunkToSearch == null } + stopRetriesPredicate = { + it is SearchResult.ItemFound || nextChunkToSearchRef.get() == null + } ) { - val (chunkStart, chunkEnd) = nextChunkToSearch!! - log.trace("searching in chunk={}", CommonDomainFunctions.blockIntervalString(chunkStart, chunkEnd)) + log.trace("searching in chunk={}", nextChunkToSearchRef.get()) + val (chunkStart, chunkEnd) = nextChunkToSearchRef.get()!! + val chunkInterval = CommonDomainFunctions.blockIntervalString(chunkStart, chunkEnd) findLogInInterval(chunkStart, chunkEnd, address, topics, shallContinueToSearchPredicate) .thenPeek { result -> if (result is SearchResult.NoResultsInInterval) { - nextChunkToSearch = cursor.next(searchDirection = null) + nextChunkToSearchRef.set(cursor.next(searchDirection = null)) } else if (result is SearchResult.KeepSearching) { // need to search in the same chunk - nextChunkToSearch = cursor.next(searchDirection = result.direction) + nextChunkToSearchRef.set(cursor.next(searchDirection = result.direction)) } + log.trace( + "search result chunk={} searchResult={} nextChunkToSearch={}", + chunkInterval, + result, + nextChunkToSearchRef.get() + ) } }.thenApply { either -> when (either) { @@ -195,27 +206,32 @@ class Web3JLogsSearcher( fromBlock: BlockParameter, toBlock: BlockParameter ): SafeFuture> { - return if (fromBlock is BlockParameter.BlockNumber && toBlock is BlockParameter.BlockNumber) { - return SafeFuture.completedFuture(Pair(fromBlock.getNumber(), toBlock.getNumber())) + return SafeFuture.collectAll( + getBlockParameterNumber(fromBlock), + getBlockParameterNumber(toBlock) + ).thenApply { (start, end) -> + start to end + } + } + + private fun getBlockParameterNumber(blockParameter: BlockParameter): SafeFuture { + return if (blockParameter is BlockParameter.BlockNumber) { + SafeFuture.completedFuture(blockParameter.getNumber()) + } else if (blockParameter == BlockParameter.Tag.EARLIEST) { + SafeFuture.completedFuture(0UL) } else { AsyncRetryer.retry( vertx = vertx, backoffDelay = config.backoffDelay, - stopRetriesPredicate = { (fromBlockResponse, toBlockResponse) -> - fromBlockResponse?.block?.number != null && toBlockResponse?.block?.number != null + stopRetriesPredicate = { response -> + response?.block?.number != null }, action = { - SafeFuture.collectAll( - web3jClient.ethGetBlockByNumber(fromBlock.toWeb3j(), false).sendAsync().toSafeFuture(), - web3jClient.ethGetBlockByNumber(toBlock.toWeb3j(), false).sendAsync().toSafeFuture() - ) + web3jClient.ethGetBlockByNumber(blockParameter.toWeb3j(), false).sendAsync().toSafeFuture() } ) - .thenApply { (fromBlockResponse, toBlockResponse) -> - Pair( - fromBlockResponse.block.number.toULong(), - toBlockResponse.block.number.toULong() - ) + .thenApply { response -> + response.block.number.toULong() } } } diff --git a/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateRecoveryApp.kt b/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateRecoveryApp.kt index 6e95de25c..8bb467f3a 100644 --- a/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateRecoveryApp.kt +++ b/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateRecoveryApp.kt @@ -70,6 +70,7 @@ class StateRecoveryApp( "contract address mismatch: config=${config.smartContractAddress} client=${lineaContractClient.getAddress()}" } } + private val l1EventsClient = LineaSubmissionEventsClientImpl( logsSearcher = ethLogsSearcher, smartContractAddress = config.smartContractAddress, @@ -166,16 +167,25 @@ class StateRecoveryApp( vertx = vertx, backoffDelay = config.executionClientPollingInterval, stopRetriesPredicate = { recoveryStatus -> - log.debug( - "waiting for node to sync until stateRecoverStartBlockNumber={} headBlockNumber={}", - recoveryStatus.stateRecoverStartBlockNumber, - recoveryStatus.headBlockNumber - ) // headBlockNumber shall be at least 1 block behind of stateRecoverStartBlockNumber // if it is after it means it was already enabled - recoveryStatus.stateRecoverStartBlockNumber?.let { startBlockNumber -> + val hasReachedTargetBlock = recoveryStatus.stateRecoverStartBlockNumber?.let { startBlockNumber -> recoveryStatus.headBlockNumber + 1u >= startBlockNumber } ?: false + if (hasReachedTargetBlock) { + log.info( + "node reached recovery target block: stateRecoverStartBlockNumber={} headBlockNumber={}", + recoveryStatus.stateRecoverStartBlockNumber, + recoveryStatus.headBlockNumber + ) + } else { + log.info( + "waiting for node to sync until stateRecoverStartBlockNumber={} - 1, headBlockNumber={}", + recoveryStatus.stateRecoverStartBlockNumber, + recoveryStatus.headBlockNumber + ) + } + hasReachedTargetBlock } ) { elClient.lineaGetStateRecoveryStatus() diff --git a/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateSynchronizerService.kt b/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateSynchronizerService.kt index 6026515d0..99294bd39 100644 --- a/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateSynchronizerService.kt +++ b/state-recovery/appcore/logic/src/main/kotlin/linea/staterecovery/StateSynchronizerService.kt @@ -70,18 +70,16 @@ class StateSynchronizerService( } return findNextFinalization() - .thenPeek { nextFinalization -> - log.debug( - "sync state loop: lastSuccessfullyProcessedFinalization={} nextFinalization={}", - lastSuccessfullyProcessedFinalization?.event?.intervalString(), - nextFinalization?.event?.intervalString() - ) - } .thenCompose { nextFinalization -> if (nextFinalization == null) { // nothing to do for now SafeFuture.completedFuture(null) } else { + log.debug( + "sync state loop: lastSuccessfullyProcessedFinalization={} nextFinalization={}", + lastSuccessfullyProcessedFinalization?.event?.intervalString(), + nextFinalization.event.intervalString() + ) submissionEventsClient .findDataSubmittedV3EventsUntilNextFinalization( l2StartBlockNumberInclusive = nextFinalization.event.startBlockNumber diff --git a/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/AppConfigurator.kt b/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/AppConfigurator.kt index 52d2a9445..87b6a8a5c 100644 --- a/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/AppConfigurator.kt +++ b/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/AppConfigurator.kt @@ -6,6 +6,7 @@ import build.linea.contract.l1.Web3JLineaRollupSmartContractClientReadOnly import io.micrometer.core.instrument.MeterRegistry import io.vertx.core.Vertx import io.vertx.micrometer.backends.BackendRegistries +import linea.domain.RetryConfig import linea.staterecovery.BlockHeaderStaticFields import linea.staterecovery.ExecutionLayerClient import linea.staterecovery.StateRecoveryApp @@ -62,15 +63,24 @@ data class AppClients( val transactionDetailsClient: TransactionDetailsClient ) +fun RetryConfig.toRequestRetryConfig(): RequestRetryConfig { + return RequestRetryConfig( + maxRetries = this.maxRetries, + timeout = this.timeout, + backoffDelay = this.backoffDelay, + failuresWarningThreshold = this.failuresWarningThreshold + ) +} + fun createAppClients( vertx: Vertx = Vertx.vertx(), meterRegistry: MeterRegistry = BackendRegistries.getDefaultNow(), l1RpcEndpoint: URI, - l1RpcRequestRetryConfig: RequestRetryConfig = RequestRetryConfig(backoffDelay = 1.seconds), + l1RpcRequestRetryConfig: RetryConfig = RetryConfig(backoffDelay = 1.seconds), blobScanEndpoint: URI, - blobScanRequestRetryConfig: RequestRetryConfig = RequestRetryConfig(backoffDelay = 1.seconds), + blobScanRequestRetryConfig: RetryConfig = RetryConfig(backoffDelay = 1.seconds), stateManagerClientEndpoint: URI, - stateManagerRequestRetry: RequestRetryConfig = RequestRetryConfig(backoffDelay = 1.seconds), + stateManagerRequestRetry: RetryConfig = RetryConfig(backoffDelay = 1.seconds), zkStateManagerVersion: String = "2.3.0", appConfig: StateRecoveryApp.Config ): AppClients { @@ -89,13 +99,17 @@ fun createAppClients( rpcUrl = l1RpcEndpoint.toString(), log = log ), + config = Web3JLogsSearcher.Config( + backoffDelay = 1.seconds, + requestRetryConfig = RetryConfig() + ), log = log ) } val blobScanClient = BlobScanClient.create( vertx = vertx, endpoint = blobScanEndpoint, - requestRetryConfig = blobScanRequestRetryConfig, + requestRetryConfig = blobScanRequestRetryConfig.toRequestRetryConfig(), logger = LogManager.getLogger("linea.plugin.staterecovery.clients.l1.blob-scan") ) val jsonRpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, MicrometerMetricsFacade(meterRegistry)) @@ -103,14 +117,14 @@ fun createAppClients( rpcClientFactory = jsonRpcClientFactory, endpoints = listOf(stateManagerClientEndpoint), maxInflightRequestsPerClient = 10u, - requestRetry = stateManagerRequestRetry, + requestRetry = stateManagerRequestRetry.toRequestRetryConfig(), zkStateManagerVersion = zkStateManagerVersion, logger = LogManager.getLogger("linea.plugin.staterecovery.clients.state-manager") ) val transactionDetailsClient: TransactionDetailsClient = VertxTransactionDetailsClient.create( jsonRpcClientFactory = jsonRpcClientFactory, endpoint = l1RpcEndpoint, - retryConfig = l1RpcRequestRetryConfig, + retryConfig = l1RpcRequestRetryConfig.toRequestRetryConfig(), logger = LogManager.getLogger("linea.plugin.staterecovery.clients.l1.transaction-details") ) return AppClients( diff --git a/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/BlockImporter.kt b/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/BlockImporter.kt index 71a4802c4..144a2f894 100644 --- a/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/BlockImporter.kt +++ b/state-recovery/besu-plugin/src/main/kotlin/linea/staterecovery/plugin/BlockImporter.kt @@ -60,18 +60,6 @@ class BlockImporter( return executedBlockResult } - private fun createOverrides(blockFromBlob: BlockFromL1RecoveredData): BlockOverrides { - return BlockOverrides.builder() - .blockHash(Hash.wrap(Bytes32.wrap(blockFromBlob.header.blockHash))) - .feeRecipient(Address.fromHexString(blockFromBlob.header.coinbase.encodeHex())) - .blockNumber(blockFromBlob.header.blockNumber.toLong()) - .gasLimit(blockFromBlob.header.gasLimit.toLong()) - .timestamp(blockFromBlob.header.blockTimestamp.epochSeconds) - .difficulty(blockFromBlob.header.difficulty.toBigInteger()) - .mixHashOrPrevRandao(Hash.ZERO) - .build() - } - fun importBlock(context: BlockContext): PluginBlockSimulationResult { log.trace( "calling simulateAndPersistWorldState block={} blockHeader={}", @@ -95,18 +83,6 @@ class BlockImporter( return importedBlockResult } - private fun createOverrides(blockHeader: BlockHeader): BlockOverrides { - return BlockOverrides.builder() - .feeRecipient(blockHeader.coinbase) - .blockNumber(blockHeader.number) - .gasLimit(blockHeader.gasLimit) - .timestamp(blockHeader.timestamp) - .difficulty(blockHeader.difficulty.asBigInteger) - .stateRoot(blockHeader.stateRoot) - .mixHashOrPrevRandao(Hash.ZERO) - .build() - } - private fun storeAndSetHead(block: PluginBlockSimulationResult) { log.debug( "storeAndSetHead result: blockHeader={}", @@ -119,4 +95,30 @@ class BlockImporter( ) synchronizationService.setHeadUnsafe(block.blockHeader, block.blockBody) } + + companion object { + fun createOverrides(blockFromBlob: BlockFromL1RecoveredData): BlockOverrides { + return BlockOverrides.builder() + .blockHash(Hash.wrap(Bytes32.wrap(blockFromBlob.header.blockHash))) + .feeRecipient(Address.fromHexString(blockFromBlob.header.coinbase.encodeHex())) + .blockNumber(blockFromBlob.header.blockNumber.toLong()) + .gasLimit(blockFromBlob.header.gasLimit.toLong()) + .timestamp(blockFromBlob.header.blockTimestamp.epochSeconds) + .difficulty(blockFromBlob.header.difficulty.toBigInteger()) + .mixHashOrPrevRandao(Hash.ZERO) + .build() + } + + fun createOverrides(blockHeader: BlockHeader): BlockOverrides { + return BlockOverrides.builder() + .feeRecipient(blockHeader.coinbase) + .blockNumber(blockHeader.number) + .gasLimit(blockHeader.gasLimit) + .timestamp(blockHeader.timestamp) + .difficulty(blockHeader.difficulty.asBigInteger) + .stateRoot(blockHeader.stateRoot) + .mixHashOrPrevRandao(Hash.ZERO) + .build() + } + } } diff --git a/state-recovery/test-cases/build.gradle b/state-recovery/test-cases/build.gradle index a66121e53..fcc6099ba 100644 --- a/state-recovery/test-cases/build.gradle +++ b/state-recovery/test-cases/build.gradle @@ -19,6 +19,7 @@ dependencies { api(project(':state-recovery:appcore:clients-interfaces')) api(project(':state-recovery:appcore:domain-models')) api(project(':state-recovery:appcore:logic')) + api(project(':state-recovery:besu-plugin')) implementation project(':jvm-libs:linea:besu-libs') implementation project(':jvm-libs:linea:testing:file-system') diff --git a/state-recovery/test-cases/src/integrationTest/kotlin/linea/staterecovery/StateRecoveryAppWithFakeExecutionClientIntTest.kt b/state-recovery/test-cases/src/integrationTest/kotlin/linea/staterecovery/StateRecoveryAppWithFakeExecutionClientIntTest.kt index 864768b8f..9b9d1741a 100644 --- a/state-recovery/test-cases/src/integrationTest/kotlin/linea/staterecovery/StateRecoveryAppWithFakeExecutionClientIntTest.kt +++ b/state-recovery/test-cases/src/integrationTest/kotlin/linea/staterecovery/StateRecoveryAppWithFakeExecutionClientIntTest.kt @@ -1,24 +1,18 @@ package linea.staterecovery import build.linea.contract.l1.LineaContractVersion -import build.linea.contract.l1.LineaRollupSmartContractClientReadOnly -import build.linea.contract.l1.Web3JLineaRollupSmartContractClientReadOnly -import io.micrometer.core.instrument.simple.SimpleMeterRegistry import io.vertx.core.Vertx import io.vertx.junit5.VertxExtension import linea.domain.RetryConfig import linea.log4j.configureLoggers -import linea.staterecovery.clients.VertxTransactionDetailsClient -import linea.staterecovery.clients.blobscan.BlobScanClient +import linea.staterecovery.plugin.AppClients +import linea.staterecovery.plugin.createAppClients import linea.staterecovery.test.FakeExecutionLayerClient import linea.staterecovery.test.FakeStateManagerClient import linea.staterecovery.test.FakeStateManagerClientBasedOnBlobsRecords -import linea.web3j.Web3JLogsSearcher +import linea.web3j.createWeb3jHttpClient import net.consensys.linea.BlockNumberAndHash import net.consensys.linea.BlockParameter -import net.consensys.linea.jsonrpc.client.RequestRetryConfig -import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory -import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade import net.consensys.linea.testing.submission.AggregationAndBlobs import net.consensys.linea.testing.submission.loadBlobsAndAggregationsSortedAndGrouped import net.consensys.linea.testing.submission.submitBlobsAndAggregationsAndWaitExecution @@ -26,13 +20,13 @@ import net.consensys.zkevm.coordinator.clients.smartcontract.LineaRollupSmartCon import net.consensys.zkevm.ethereum.ContractsManager import net.consensys.zkevm.ethereum.MakeFileDelegatedContractsManager.connectToLineaRollupContract import net.consensys.zkevm.ethereum.MakeFileDelegatedContractsManager.lineaRollupContractErrors -import net.consensys.zkevm.ethereum.Web3jClientManager import org.apache.logging.log4j.Level import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.Awaitility.await import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.extension.ExtendWith import java.net.URI import kotlin.time.Duration @@ -42,20 +36,17 @@ import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @ExtendWith(VertxExtension::class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class StateRecoveryAppWithFakeExecutionClientIntTest { private val log = LogManager.getLogger("test.case.StateRecoverAppWithFakeExecutionClientIntTest") private lateinit var stateRecoverApp: StateRecoveryApp private lateinit var aggregationsAndBlobs: List - private lateinit var executionLayerClient: FakeExecutionLayerClient + private lateinit var fakeExecutionLayerClient: FakeExecutionLayerClient private lateinit var fakeStateManagerClient: FakeStateManagerClient - private lateinit var transactionDetailsClient: TransactionDetailsClient - private lateinit var lineaContractClient: LineaRollupSmartContractClientReadOnly - private lateinit var contractClientForBlobSubmissions: LineaRollupSmartContractClient private lateinit var contractClientForAggregationSubmissions: LineaRollupSmartContractClient - private lateinit var blobScanClient: BlobScanClient - private lateinit var logsSearcher: Web3JLogsSearcher private lateinit var vertx: Vertx + private lateinit var appClients: AppClients private val testDataDir = run { "testdata/coordinator/prover/v3" @@ -67,54 +58,36 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { @BeforeEach fun beforeEach(vertx: Vertx) { this.vertx = vertx - val jsonRpcFactory = VertxHttpJsonRpcClientFactory( - vertx = vertx, - metricsFacade = MicrometerMetricsFacade(SimpleMeterRegistry()) - ) + vertx.exceptionHandler { + log.error("unhandled exception", it) + } aggregationsAndBlobs = loadBlobsAndAggregationsSortedAndGrouped( blobsResponsesDir = "$testDataDir/compression/responses", aggregationsResponsesDir = "$testDataDir/aggregation/responses" ) - executionLayerClient = FakeExecutionLayerClient( + fakeExecutionLayerClient = FakeExecutionLayerClient( headBlock = BlockNumberAndHash(number = 0uL, hash = ByteArray(32) { 0 }), initialStateRecoverStartBlockNumber = null, loggerName = "test.fake.clients.l1.fake-execution-layer" ) fakeStateManagerClient = FakeStateManagerClientBasedOnBlobsRecords(blobRecords = aggregationsAndBlobs.flatMap { it.blobs }) - transactionDetailsClient = VertxTransactionDetailsClient.create( - jsonRpcClientFactory = jsonRpcFactory, - endpoint = URI(l1RpcUrl), - retryConfig = RequestRetryConfig( - backoffDelay = 10.milliseconds, - timeout = 2.seconds - ), - logger = LogManager.getLogger("test.clients.l1.transaction-details") - ) val rollupDeploymentResult = ContractsManager.get() .deployLineaRollup(numberOfOperators = 2, contractVersion = LineaContractVersion.V6).get() - lineaContractClient = Web3JLineaRollupSmartContractClientReadOnly( - web3j = Web3jClientManager.buildL1Client( - log = LogManager.getLogger("test.clients.l1.linea-contract"), - requestResponseLogLevel = Level.INFO, - failuresLogLevel = Level.WARN - ), - contractAddress = rollupDeploymentResult.contractAddress - ) - this.logsSearcher = Web3JLogsSearcher( + appClients = createAppClients( vertx = vertx, - web3jClient = Web3jClientManager.buildL1Client( - log = LogManager.getLogger("test.clients.l1.events-fetcher"), - requestResponseLogLevel = Level.TRACE, - failuresLogLevel = Level.WARN - ), - Web3JLogsSearcher.Config( - backoffDelay = 1.milliseconds, - requestRetryConfig = RetryConfig.noRetries - ), - log = LogManager.getLogger("test.clients.l1.events-fetcher") + l1RpcEndpoint = URI(l1RpcUrl), + l1RpcRequestRetryConfig = RetryConfig(backoffDelay = 2.seconds), + blobScanEndpoint = URI(blobScanUrl), + stateManagerClientEndpoint = URI("http://it-does-not-matter:5432"), + appConfig = StateRecoveryApp.Config( + l1LatestSearchBlock = BlockParameter.Tag.LATEST, + l1PollingInterval = 10.milliseconds, + executionClientPollingInterval = 1.seconds, + smartContractAddress = rollupDeploymentResult.contractAddress + ) ) contractClientForBlobSubmissions = rollupDeploymentResult.rollupOperatorClient @@ -123,33 +96,19 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { rollupDeploymentResult.rollupOperators[1].txManager, smartContractErrors = lineaRollupContractErrors ) - this.blobScanClient = BlobScanClient.create( - vertx = vertx, - endpoint = URI(blobScanUrl), - requestRetryConfig = RequestRetryConfig( - backoffDelay = 10.milliseconds, - timeout = 2.seconds - ), - responseLogMaxSize = 1000u, - logger = LogManager.getLogger("test.clients.l1.blobscan") - ) instantiateStateRecoveryApp() configureLoggers( rootLevel = Level.INFO, - log.name to Level.INFO, + log.name to Level.DEBUG, "linea.testing.submission" to Level.INFO, "net.consensys.linea.contract.Web3JContractAsyncHelper" to Level.WARN, // silence noisy gasPrice Caps logs - "test.clients.l1.executionlayer" to Level.DEBUG, - "test.clients.l1.web3j-default" to Level.INFO, - "test.clients.l1.state-manager" to Level.INFO, - "test.clients.l1.transaction-details" to Level.INFO, - "test.clients.l1.linea-contract" to Level.INFO, - "test.clients.l1.events-fetcher" to Level.INFO, - "test.clients.l1.blobscan" to Level.INFO, - "net.consensys.linea.contract.l1" to Level.INFO, - "test.fake.clients.l1.fake-execution-layer" to Level.INFO + "linea.staterecovery.BlobDecompressorToDomainV1" to Level.DEBUG, + "linea.plugin.staterecovery.clients" to Level.DEBUG, + "test.fake.clients.l1.fake-execution-layer" to Level.DEBUG, + "test.clients.l1.web3j-default" to Level.DEBUG, + "test.clients.l1.web3j.receipt-poller" to Level.TRACE ) } @@ -158,18 +117,19 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { ) { stateRecoverApp = StateRecoveryApp( vertx = vertx, - elClient = executionLayerClient, - blobFetcher = blobScanClient, - ethLogsSearcher = logsSearcher, + elClient = fakeExecutionLayerClient, + blobFetcher = appClients.blobScanClient, + ethLogsSearcher = appClients.ethLogsSearcher, stateManagerClient = fakeStateManagerClient, - transactionDetailsClient = transactionDetailsClient, + transactionDetailsClient = appClients.transactionDetailsClient, blockHeaderStaticFields = BlockHeaderStaticFields.localDev, - lineaContractClient = lineaContractClient, + lineaContractClient = appClients.lineaContractClient, config = StateRecoveryApp.Config( + l1EarliestSearchBlock = BlockParameter.Tag.EARLIEST, l1LatestSearchBlock = BlockParameter.Tag.LATEST, - l1PollingInterval = 10.milliseconds, + l1PollingInterval = 1.seconds, executionClientPollingInterval = 1.seconds, - smartContractAddress = lineaContractClient.getAddress(), + smartContractAddress = appClients.lineaContractClient.contractAddress, debugForceSyncStopBlockNumber = debugForceSyncStopBlockNumber ) ) @@ -186,7 +146,10 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { aggregationsAndBlobs = aggregationsAndBlobs, blobChunksMaxSize = blobChunksSize, waitTimeout = waitTimeout, - l1Web3jClient = Web3jClientManager.l1Client, + l1Web3jClient = createWeb3jHttpClient( + rpcUrl = l1RpcUrl, + log = LogManager.getLogger("test.clients.l1.web3j.receipt-poller") + ), log = log ) } @@ -214,7 +177,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { .isEqualTo(lastAggregation!!.endBlockNumber) } - assertThat(executionLayerClient.lineaGetStateRecoveryStatus().get()) + assertThat(fakeExecutionLayerClient.lineaGetStateRecoveryStatus().get()) .isEqualTo( StateRecoveryStatus( headBlockNumber = lastAggregation!!.endBlockNumber, @@ -249,7 +212,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { aggregationsAndBlobs = finalizationsBeforeCutOff ) - executionLayerClient.headBlock = BlockNumberAndHash( + fakeExecutionLayerClient.headBlock = BlockNumberAndHash( number = 1UL, hash = ByteArray(32) { 0 } ) @@ -262,17 +225,17 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { .atMost(4.minutes.toJavaDuration()) .pollInterval(1.seconds.toJavaDuration()) .untilAsserted { - assertThat(executionLayerClient.stateRecoverStatus).isEqualTo( + assertThat(fakeExecutionLayerClient.stateRecoverStatus).isEqualTo( StateRecoveryStatus( headBlockNumber = 1UL, stateRecoverStartBlockNumber = expectedStateRecoverStartBlockNumber ) ) - log.info("stateRecoverStatus={}", executionLayerClient.stateRecoverStatus) + log.info("stateRecoverStatus={}", fakeExecutionLayerClient.stateRecoverStatus) } // simulate that execution client has synced up to the last finalized block through P2P network - executionLayerClient.headBlock = BlockNumberAndHash( + fakeExecutionLayerClient.headBlock = BlockNumberAndHash( number = lastFinalizedBlockNumber, hash = ByteArray(32) { 0 } ) @@ -287,11 +250,11 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { .atMost(1.minutes.toJavaDuration()) .pollInterval(1.seconds.toJavaDuration()) .untilAsserted { - assertThat(executionLayerClient.headBlock.number).isEqualTo(lastAggregation!!.endBlockNumber) + assertThat(fakeExecutionLayerClient.headBlock.number).isEqualTo(lastAggregation!!.endBlockNumber) } // assert it imports correct blocks - val importedBlocks = executionLayerClient.importedBlockNumbersInRecoveryMode + val importedBlocks = fakeExecutionLayerClient.importedBlockNumbersInRecoveryMode assertThat(importedBlocks.first()).isEqualTo(expectedStateRecoverStartBlockNumber) assertThat(importedBlocks.last()).isEqualTo(lastAggregation!!.endBlockNumber) } @@ -324,7 +287,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { // set execution layer head block after latest finalization val headBlockNumberAtStart = finalizationsBeforeCutOff.last().aggregation!!.endBlockNumber + 1UL - executionLayerClient.headBlock = BlockNumberAndHash( + fakeExecutionLayerClient.headBlock = BlockNumberAndHash( number = headBlockNumberAtStart, hash = ByteArray(32) { 0 } ) @@ -334,13 +297,13 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { .atMost(2.minutes.toJavaDuration()) .pollInterval(1.seconds.toJavaDuration()) .untilAsserted { - assertThat(executionLayerClient.stateRecoverStatus).isEqualTo( + assertThat(fakeExecutionLayerClient.stateRecoverStatus).isEqualTo( StateRecoveryStatus( headBlockNumber = headBlockNumberAtStart, stateRecoverStartBlockNumber = headBlockNumberAtStart + 1UL ) ) - log.debug("stateRecoverStatus={}", executionLayerClient.stateRecoverStatus) + log.debug("stateRecoverStatus={}", fakeExecutionLayerClient.stateRecoverStatus) } // continue finalizing the rest of the aggregations @@ -353,7 +316,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { .atMost(2.minutes.toJavaDuration()) .pollInterval(1.seconds.toJavaDuration()) .untilAsserted { - assertThat(executionLayerClient.stateRecoverStatus) + assertThat(fakeExecutionLayerClient.stateRecoverStatus) .isEqualTo( StateRecoveryStatus( headBlockNumber = lastAggregation!!.endBlockNumber, @@ -362,7 +325,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { ) } // assert it does not try to import blocks behind the head block - assertThat(executionLayerClient.importedBlockNumbersInRecoveryMode.minOrNull()) + assertThat(fakeExecutionLayerClient.importedBlockNumbersInRecoveryMode.minOrNull()) .isEqualTo(headBlockNumberAtStart + 1UL) } @@ -387,7 +350,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { assertThat(stateRecoverApp.stateRootMismatchFound).isTrue() } - assertThat(executionLayerClient.headBlock.number) + assertThat(fakeExecutionLayerClient.headBlock.number) .isEqualTo(aggregationsAndBlobs[1].aggregation!!.endBlockNumber) } @@ -398,15 +361,14 @@ class StateRecoveryAppWithFakeExecutionClientIntTest { log.debug("forceSyncStopBlockNumber={}", fakeStateManagerClient) stateRecoverApp.start().get() - submitDataToL1ContactAndWaitExecution(waitTimeout = 1.minutes) + submitDataToL1ContactAndWaitExecution(waitTimeout = 3.minutes) await() .atMost(1.minutes.toJavaDuration()) .untilAsserted { - println(executionLayerClient.headBlock.number) - assertThat(executionLayerClient.headBlock.number).isGreaterThanOrEqualTo(debugForceSyncStopBlockNumber) + assertThat(fakeExecutionLayerClient.headBlock.number).isGreaterThanOrEqualTo(debugForceSyncStopBlockNumber) } - assertThat(executionLayerClient.headBlock.number).isEqualTo(debugForceSyncStopBlockNumber) + assertThat(fakeExecutionLayerClient.headBlock.number).isEqualTo(debugForceSyncStopBlockNumber) } } diff --git a/state-recovery/test-cases/src/main/kotlin/linea/staterecovery/test/FakeStateManagerClientBasedOnBlobsRecords.kt b/state-recovery/test-cases/src/main/kotlin/linea/staterecovery/test/FakeStateManagerClient.kt similarity index 88% rename from state-recovery/test-cases/src/main/kotlin/linea/staterecovery/test/FakeStateManagerClientBasedOnBlobsRecords.kt rename to state-recovery/test-cases/src/main/kotlin/linea/staterecovery/test/FakeStateManagerClient.kt index e1b17a22b..8ff3ced36 100644 --- a/state-recovery/test-cases/src/main/kotlin/linea/staterecovery/test/FakeStateManagerClientBasedOnBlobsRecords.kt +++ b/state-recovery/test-cases/src/main/kotlin/linea/staterecovery/test/FakeStateManagerClient.kt @@ -14,11 +14,14 @@ import net.consensys.linea.errors.ErrorResponse import net.consensys.toHexStringUInt256 import net.consensys.zkevm.domain.BlobRecord import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.util.concurrent.ConcurrentHashMap open class FakeStateManagerClient( - private val blocksStateRootHashes: MutableMap = mutableMapOf(), - var headBlockNumber: ULong = blocksStateRootHashes.keys.maxOrNull() ?: 0UL + _blocksStateRootHashes: Map = emptyMap(), + var headBlockNumber: ULong = _blocksStateRootHashes.keys.maxOrNull() ?: 0UL ) : StateManagerClientV1 { + open val blocksStateRootHashes: MutableMap = + ConcurrentHashMap(_blocksStateRootHashes) fun setBlockStateRootHash(blockNumber: ULong, stateRootHash: ByteArray) { blocksStateRootHashes[blockNumber] = stateRootHash @@ -56,8 +59,8 @@ open class FakeStateManagerClient( class FakeStateManagerClientBasedOnBlobsRecords( val blobRecords: List ) : FakeStateManagerClient( - blocksStateRootHashes = blobRecords - .associate { it.endBlockNumber to it.blobCompressionProof!!.finalStateRootHash }.toMutableMap() + _blocksStateRootHashes = blobRecords + .associate { it.endBlockNumber to it.blobCompressionProof!!.finalStateRootHash } ) class FakeStateManagerClientReadFromL1( diff --git a/transaction-exclusion-api/app/build.gradle b/transaction-exclusion-api/app/build.gradle index 686194fc1..c3087865b 100644 --- a/transaction-exclusion-api/app/build.gradle +++ b/transaction-exclusion-api/app/build.gradle @@ -29,8 +29,7 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind:${libs.versions.jackson.get()}" implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${libs.versions.jackson.get()}" - implementation "org.hyperledger.besu.internal:core:${libs.versions.besu.get()}" - implementation "org.hyperledger.besu:plugin-api:${libs.versions.besu.get()}" + implementation(project(":jvm-libs:linea:besu-libs")) api("io.netty:netty-transport-native-epoll:${libs.versions.netty.get()}:linux-x86_64") { because "It enables native transport for Linux."