diff --git a/accounts/abi/bind/util_test.go b/accounts/abi/bind/util_test.go index 87917d43fa..592465f2ac 100644 --- a/accounts/abi/bind/util_test.go +++ b/accounts/abi/bind/util_test.go @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) { // Send and mine the transaction. backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() select { @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() notContractCreation := errors.New("tx is not contract creation") if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() { @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) { }() backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined cancel() } diff --git a/beacon/engine/types.go b/beacon/engine/types.go index 9a3ea8d077..960c68d5e9 100644 --- a/beacon/engine/types.go +++ b/beacon/engine/types.go @@ -30,6 +30,12 @@ import ( // building of the payload to commence. type PayloadVersion byte +const ( + GetPayloadStage = "getPayload" + NewPayloadStage = "newPayload" + ForkchoiceUpdatedStage = "forkchoiceUpdated" +) + var ( PayloadV1 PayloadVersion = 0x1 PayloadV2 PayloadVersion = 0x2 @@ -181,6 +187,12 @@ type ForkchoiceStateV1 struct { FinalizedBlockHash common.Hash `json:"finalizedBlockHash"` } +type OpSealPayloadResponse struct { + ErrStage string `json:"errStage"` + PayloadStatus PayloadStatusV1 `json:"payloadStatus"` + Payload *ExecutionPayloadEnvelope `json:"payload"` +} + func encodeTransactions(txs []*types.Transaction) [][]byte { var enc = make([][]byte, len(txs)) for i, tx := range txs { diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 27496113d7..da797c727e 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { cfg.Eth.OverrideVerkle = &v } + if ctx.Bool(utils.MiningEnabledFlag.Name) { + cfg.Eth.TxPool.EnableCache = true + } + backend, eth := utils.RegisterEthService(stack, &cfg.Eth) // Create gauge with geth system and build information diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index ba4980b8d4..64eccaf31b 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -399,11 +399,17 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea // Finalize and assemble the block. beacon.Finalize(chain, header, state, txs, uncles, withdrawals) - // Assign the final state root to header. - header.Root = state.IntermediateRoot(true) + rootCh := make(chan common.Hash) + go func() { + rootCh <- state.IntermediateRoot(true) + }() + + block := types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)) + headerWithRoot := block.Header() + headerWithRoot.Root = <-rootCh // Assemble and return the final block. - return types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)), nil + return block.WithSeal(headerWithRoot), nil } // Seal generates a new sealing request for the given input block and pushes diff --git a/core/block_validator.go b/core/block_validator.go index 79839d7176..765a202cbe 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -162,7 +162,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // ValidateState validates the various changes that happen after a state transition, // such as amount of used gas, the receipt roots and the state root itself. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -186,14 +186,16 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD } return nil }, - func() error { + } + if !skipRoot { + validateFuns = append(validateFuns, func() error { // Validate the state root against the received state root and throw // an error if they don't match. if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error()) } return nil - }, + }) } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { diff --git a/core/blockchain.go b/core/blockchain.go index 516fe23747..825dce4e0d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1190,6 +1190,13 @@ func (bc *BlockChain) procFutureBlocks() { } } +// CacheBlock cache block in memory +func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) { + bc.hc.numberCache.Add(hash, block.NumberU64()) + bc.hc.headerCache.Add(hash, block.Header()) + bc.blockCache.Add(hash, block) +} + // CacheMiningReceipts cache receipts in memory func (bc *BlockChain) CacheMiningReceipts(hash common.Hash, receipts types.Receipts) { bc.miningReceiptsCache.Add(hash, receipts) @@ -1736,6 +1743,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return 0, nil } + minerMode := false + if len(chain) == 1 { + block := chain[0] + _, receiptExist := bc.miningReceiptsCache.Get(block.Hash()) + _, logExist := bc.miningTxLogsCache.Get(block.Hash()) + _, stateExist := bc.miningStateCache.Get(block.Hash()) + minerMode = receiptExist && logExist && stateExist + } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain) @@ -1759,7 +1775,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Peek the error for the first block to decide the directing import logic it := newInsertIterator(chain, results, bc.validator) - block, err := it.next() + var block *types.Block + var err error + if minerMode { + block = chain[0] + it.index = 0 + } else { + block, err = it.next() + } // Left-trim all the known blocks that don't need to build snapshot if bc.skipBlock(err, it) { @@ -1975,11 +1998,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) ptime := time.Since(pstart) vstart := time.Now() - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - return it.index, err + // Async validate if minerMode + asyncValidateStateCh := make(chan error, 1) + if minerMode { + header := block.Header() + // Can not validate root concurrently + if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root { + err := fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error()) + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } + go func() { + asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true) + }() + } else { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil { + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } } + vtime := time.Since(vstart) proctime := time.Since(start) // processing + validation @@ -2015,6 +2055,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if err != nil { return it.index, err } + if minerMode { + if err := <-asyncValidateStateCh; err != nil { + panic(fmt.Errorf("self mined block(hash: %x number %v) async verify state err: %w", block.Hash(), block.NumberU64(), err)) + } + } + bc.CacheBlock(block.Hash(), block) + // Update the metrics touched during block commit accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them @@ -2033,10 +2080,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) stats.usedGas += usedGas var snapDiffItems, snapBufItems common.StorageSize - if bc.snaps != nil { + if bc.snaps != nil && !minerMode { snapDiffItems, snapBufItems = bc.snaps.Size() } - trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size() + + var trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize + if !minerMode { + trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ = bc.triedb.Size() + } stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead) blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000) @@ -2536,10 +2587,17 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { return common.Hash{}, err } } - bc.writeHeadBlock(head) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + bc.writeHeadBlock(head) + }() // Emit events logs := bc.collectLogs(head, false) + wg.Wait() + bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs}) if len(logs) > 0 { bc.logsFeed.Send(logs) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index c64e9ddbc0..e751b7a85e 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -169,7 +169,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, false) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index 70c9f44189..8629dee364 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -40,7 +40,7 @@ var ( // Note, bumping this up might drastically increase the size of the bloom // filters that's stored in every diff layer. Don't do that without fully // understanding all the implications. - aggregatorMemoryLimit = uint64(4 * 1024 * 1024) + aggregatorMemoryLimit = uint64(4 * 1024 * 1024 * 3) // opBNB block gas limit is about 3 times to Ethereum // aggregatorItemLimit is an approximate number of items that will end up // in the aggregator layer before it's flushed out to disk. A plain account @@ -50,6 +50,9 @@ var ( // smaller number to be on the safe side. aggregatorItemLimit = aggregatorMemoryLimit / 42 + // bloomItemLimit is an approximate number of all difflayer items (128 difflayers + 1 aggregatorlayer) + bloomItemLimit = 25*10000*5 + aggregatorItemLimit + // bloomTargetError is the target false positive rate when the aggregator // layer is at its fullest. The actual value will probably move around up // and down from this number, it's mostly a ballpark figure. @@ -57,16 +60,16 @@ var ( // Note, dropping this down might drastically increase the size of the bloom // filters that's stored in every diff layer. Don't do that without fully // understanding all the implications. - bloomTargetError = 0.02 + bloomTargetError = 0.01 // bloomSize is the ideal bloom filter size given the maximum number of items // it's expected to hold and the target false positive error rate. - bloomSize = math.Ceil(float64(aggregatorItemLimit) * math.Log(bloomTargetError) / math.Log(1/math.Pow(2, math.Log(2)))) + bloomSize = math.Ceil(float64(bloomItemLimit) * math.Log(bloomTargetError) / math.Log(1/math.Pow(2, math.Log(2)))) // bloomFuncs is the ideal number of bits a single entry should set in the // bloom filter to keep its size to a minimum (given it's size and maximum // entry count). - bloomFuncs = math.Round((bloomSize / float64(aggregatorItemLimit)) * math.Log(2)) + bloomFuncs = math.Round((bloomSize / float64(bloomItemLimit)) * math.Log(2)) // the bloom offsets are runtime constants which determines which part of the // account/storage hash the hasher functions looks at, to determine the diff --git a/core/state/statedb.go b/core/state/statedb.go index 0d5e7a7eef..69b7385a3e 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1375,7 +1375,6 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er if metrics.EnabledExpensive { defer func(start time.Time) { - s.AccountCommits += time.Since(start) accountUpdatedMeter.Mark(int64(s.AccountUpdated)) storageUpdatedMeter.Mark(int64(s.StorageUpdated)) accountDeletedMeter.Mark(int64(s.AccountDeleted)) diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go index e8cedaa902..892d797394 100644 --- a/core/txpool/legacypool/cache_for_miner.go +++ b/core/txpool/legacypool/cache_for_miner.go @@ -5,10 +5,8 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" - "github.com/holiman/uint256" ) var ( @@ -16,24 +14,26 @@ var ( localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil) ) +type pendingCache interface { + add(types.Transactions, types.Signer) + del(types.Transactions, types.Signer) + dump() map[common.Address]types.Transactions + markLocal(common.Address) + flattenLocals() []common.Address +} + // copy of pending transactions type cacheForMiner struct { txLock sync.Mutex pending map[common.Address]map[*types.Transaction]struct{} locals map[common.Address]bool addrLock sync.Mutex - - allCache map[common.Address][]*txpool.LazyTransaction - filteredCache map[common.Address][]*txpool.LazyTransaction - cacheLock sync.Mutex } func newCacheForMiner() *cacheForMiner { return &cacheForMiner{ - pending: make(map[common.Address]map[*types.Transaction]struct{}), - locals: make(map[common.Address]bool), - allCache: make(map[common.Address][]*txpool.LazyTransaction), - filteredCache: make(map[common.Address][]*txpool.LazyTransaction), + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), } } @@ -75,9 +75,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { } } -func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { +func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { pending := make(map[common.Address]types.Transactions) - pc.txLock.Lock() for addr, txlist := range pc.pending { pending[addr] = make(types.Transactions, 0, len(txlist)) @@ -86,46 +85,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty } } pc.txLock.Unlock() - - // convert pending to lazyTransactions - filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) - allLazy := make(map[common.Address][]*txpool.LazyTransaction) - for addr, txs := range pending { + for _, txs := range pending { // sorted by nonce sort.Sort(types.TxByNonce(txs)) - filterd := filter(txs, addr) - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i, tx := range txs { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: tx.Hash(), - Tx: tx, - Time: tx.Time(), - GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - GasTipCap: uint256.MustFromBig(tx.GasTipCap()), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), - } - } - allLazy[addr] = lazies - filteredLazy[addr] = lazies[:len(filterd)] - } - } - - pc.cacheLock.Lock() - pc.filteredCache = filteredLazy - pc.allCache = allLazy - pc.cacheLock.Unlock() -} - -func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { - pc.cacheLock.Lock() - pending := pc.allCache - if filtered { - pending = pc.filteredCache } - pc.cacheLock.Unlock() return pending } @@ -136,7 +99,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) { pc.locals[addr] = true } -func (pc *cacheForMiner) IsLocal(addr common.Address) bool { +func (pc *cacheForMiner) isLocal(addr common.Address) bool { pc.addrLock.Lock() defer pc.addrLock.Unlock() return pc.locals[addr] diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 202f495dc6..414d1b1603 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -153,6 +153,8 @@ type BlockChain interface { // Config are the configuration parameters of the transaction pool. type Config struct { + EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled + Locals []common.Address // Addresses that should be treated by default as local NoLocals bool // Whether local transaction handling should be disabled Journal string // Journal of local transactions to survive node restarts @@ -236,6 +238,12 @@ func (config *Config) sanitize() Config { log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute) conf.ReannounceTime = time.Minute } + // log to inform user if the cache is enabled or not + if conf.EnableCache { + log.Info("legacytxpool Pending Cache is enabled") + } else { + log.Info("legacytxpool Pending Cache is disabled") + } return conf } @@ -270,7 +278,7 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price - pendingCache *cacheForMiner //pending list cache for miner + pendingCache pendingCache //pending list cache for miner reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet @@ -313,6 +321,9 @@ func New(config Config, chain BlockChain) *LegacyPool { initDoneCh: make(chan struct{}), pendingCache: newCacheForMiner(), } + if !config.EnableCache { + pool.pendingCache = newNoneCacheForMiner(pool) + } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { log.Info("Setting new local account", "address", addr) @@ -349,9 +360,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) - // set dumper - pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee)) - // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -386,27 +394,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() - go pool.loopOfSync() return nil } -func (pool *LegacyPool) loopOfSync() { - ticker := time.NewTicker(400 * time.Millisecond) - for { - select { - case <-pool.reorgShutdownCh: - return - case <-ticker.C: - gasTip := pool.gasTip.Load() - currHead := pool.currentHead.Load() - if gasTip == nil || currHead == nil { - continue - } - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee)) - } - } -} - // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -645,35 +635,56 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { - empty := txpool.PendingFilter{} - if filter == empty { - // return all pending transactions, no filtering - return pool.pendingCache.dump(false) - } + defer func(t0 time.Time) { + getPendingDurationTimer.Update(time.Since(t0)) + }(time.Now()) // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.OnlyBlobTxs { return nil } - defer func(t0 time.Time) { - getPendingDurationTimer.Update(time.Since(t0)) - }(time.Now()) - // It is a bit tricky here, we don't do the filtering here. - return pool.pendingCache.dump(true) -} -func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { - return func(txs types.Transactions, addr common.Address) types.Transactions { - if !pool.pendingCache.IsLocal(addr) { + // Convert the new uint256.Int types to the old big.Int ones used by the legacy pool + var ( + minTipBig *big.Int + baseFeeBig *big.Int + ) + if filter.MinTip != nil { + minTipBig = filter.MinTip.ToBig() + } + if filter.BaseFee != nil { + baseFeeBig = filter.BaseFee.ToBig() + } + pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) + for addr, txs := range pool.pendingCache.dump() { + + // If the miner requests tip enforcement, cap the lists now + if minTipBig != nil && !pool.locals.contains(addr) { for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { txs = txs[:i] break } } } - return txs + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i := 0; i < len(txs); i++ { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: txs[i].Hash(), + Tx: txs[i], + Time: txs[i].Time(), + GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()), + GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()), + Gas: txs[i].Gas(), + BlobGas: txs[i].BlobGas(), + } + } + pending[addr] = lazies + } } + return pending } // Locals retrieves the accounts currently considered local by the pool. @@ -1469,10 +1480,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.priced.SetBaseFee(pendingBaseFee) } } - gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee - go func() { - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee)) - }() // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 6b900ed2c0..b587f3676d 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -47,6 +47,11 @@ var ( // sideeffects used during testing. testTxPoolConfig Config + // + testTxPoolConfigEnableCache Config + + enableCache bool + // eip1559Config is a chain config with EIP-1559 enabled at block 0. eip1559Config *params.ChainConfig ) @@ -55,6 +60,10 @@ func init() { testTxPoolConfig = DefaultConfig testTxPoolConfig.Journal = "" + testTxPoolConfigEnableCache = DefaultConfig + testTxPoolConfigEnableCache.Journal = "" + testTxPoolConfigEnableCache.EnableCache = true + cpy := *params.TestChainConfig eip1559Config = &cpy eip1559Config.BerlinBlock = common.Big0 @@ -163,7 +172,12 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed)) key, _ := crypto.GenerateKey() - pool := New(testTxPoolConfig, blockchain) + var pool *LegacyPool + if enableCache { + pool = New(testTxPoolConfigEnableCache, blockchain) + } else { + pool = New(testTxPoolConfig, blockchain) + } if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil { panic(err) } @@ -1534,12 +1548,22 @@ func TestMinGasPriceEnforced(t *testing.T) { } } +func TestRepricingDynamicFeeEnableCache(t *testing.T) { + enableCache = true + repricingDynamicFee(t) + enableCache = false +} + // Tests that setting the transaction pool gas price to a higher value correctly // discards everything cheaper (legacy & dynamic fee) than that and moves any // gapped transactions back from the pending pool to the queue. // // Note, local transactions are never allowed to be dropped. func TestRepricingDynamicFee(t *testing.T) { + repricingDynamicFee(t) +} + +func repricingDynamicFee(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with diff --git a/core/txpool/legacypool/nonecache_for_miner.go b/core/txpool/legacypool/nonecache_for_miner.go new file mode 100644 index 0000000000..dd5ed21836 --- /dev/null +++ b/core/txpool/legacypool/nonecache_for_miner.go @@ -0,0 +1,52 @@ +package legacypool + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + _ pendingCache = (*noneCacheForMiner)(nil) +) + +type noneCacheForMiner struct { + pool *LegacyPool +} + +func newNoneCacheForMiner(pool *LegacyPool) *noneCacheForMiner { + return &noneCacheForMiner{pool: pool} +} + +func (nc *noneCacheForMiner) add(txs types.Transactions, signer types.Signer) { + // do nothing +} + +func (nc *noneCacheForMiner) del(txs types.Transactions, signer types.Signer) { + // do nothing +} + +func (nc *noneCacheForMiner) dump() map[common.Address]types.Transactions { + // dump all pending transactions from the pool + nc.pool.mu.RLock() + defer nc.pool.mu.RUnlock() + pending := make(map[common.Address]types.Transactions) + for addr, txlist := range nc.pool.pending { + pending[addr] = txlist.Flatten() + } + return pending +} + +func (nc *noneCacheForMiner) markLocal(addr common.Address) { + // do nothing +} + +func (nc *noneCacheForMiner) flattenLocals() []common.Address { + // return a copy of pool.locals + nc.pool.mu.RLock() + defer nc.pool.mu.RUnlock() + var locals []common.Address = make([]common.Address, 0, len(nc.pool.locals.accounts)) + for addr := range nc.pool.locals.accounts { + locals = append(locals, addr) + } + return locals +} diff --git a/core/types.go b/core/types.go index 36eb0d1ded..c236058634 100644 --- a/core/types.go +++ b/core/types.go @@ -33,7 +33,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index e8703f7532..7a6f9fdf59 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -45,6 +45,7 @@ var ( forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil) getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil) newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil) + sealPayloadTimer = metrics.NewRegisteredTimer("api/engine/seal/payload", nil) ) // Register adds the engine API to the full node. @@ -99,6 +100,8 @@ var caps = []string{ "engine_getPayloadBodiesByHashV1", "engine_getPayloadBodiesByRangeV1", "engine_getClientVersionV1", + "engine_opSealPayloadV2", + "engine_opSealPayloadV3", } type ConsensusAPI struct { @@ -602,11 +605,17 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe defer api.newPayloadLock.Unlock() log.Trace("Engine API request received", "method", "NewPayload", "number", params.Number, "hash", params.BlockHash) - block, err := engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot) - if err != nil { - log.Warn("Invalid NewPayload params", "params", params, "error", err) - return api.invalid(err, nil), nil + + block := api.localBlocks.getBlockByHash(params.BlockHash) + if block == nil { + var err error + block, err = engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot) + if err != nil { + log.Warn("Invalid NewPayload params", "params", params, "error", err) + return api.invalid(err, nil), nil + } } + // Stash away the last update to warn the user if the beacon client goes offline api.lastNewPayloadLock.Lock() api.lastNewPayloadUpdate = time.Now() @@ -691,6 +700,64 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil } +// OpSealPayloadV2 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. +func (api *ConsensusAPI) OpSealPayloadV2(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { + return api.opSealPayload(payloadID, update, needPayload, "V2") +} + +// OpSealPayloadV3 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. +func (api *ConsensusAPI) OpSealPayloadV3(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { + return api.opSealPayload(payloadID, update, needPayload, "V3") +} + +func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool, version string) (engine.OpSealPayloadResponse, error) { + start := time.Now() + defer func() { + sealPayloadTimer.UpdateSince(start) + log.Debug("sealPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "payloadID", payloadID) + }() + var payloadEnvelope *engine.ExecutionPayloadEnvelope + var err error + if version == "V2" { + payloadEnvelope, err = api.GetPayloadV2(payloadID) + } else if version == "V3" { + payloadEnvelope, err = api.GetPayloadV3(payloadID) + } else { + return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version")) + } + if err != nil { + log.Error("Seal payload error when get payload", "error", err, "payloadID", payloadID) + return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, err + } + + var payloadStatus engine.PayloadStatusV1 + if version == "V2" { + payloadStatus, err = api.NewPayloadV2(*payloadEnvelope.ExecutionPayload) + } else if version == "V3" { + payloadStatus, err = api.NewPayloadV3(*payloadEnvelope.ExecutionPayload, []common.Hash{}, payloadEnvelope.ParentBeaconBlockRoot) + } else { + return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version")) + } + if err != nil || payloadStatus.Status != engine.VALID { + log.Error("Seal payload error when new payload", "error", err, "payloadStatus", payloadStatus) + return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage, PayloadStatus: payloadStatus}, err + } + + update.HeadBlockHash = payloadEnvelope.ExecutionPayload.BlockHash + updateResponse, err := api.ForkchoiceUpdatedV3(update, nil) + if err != nil || updateResponse.PayloadStatus.Status != engine.VALID { + log.Error("Seal payload error when forkchoiceUpdated", "error", err, "payloadStatus", updateResponse.PayloadStatus) + return engine.OpSealPayloadResponse{ErrStage: engine.ForkchoiceUpdatedStage, PayloadStatus: updateResponse.PayloadStatus}, err + } + + log.Info("opSealPayload succeed", "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID, "payloadStatus", updateResponse.PayloadStatus) + if needPayload { + return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil + } else { + return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus}, nil + } +} + // delayPayloadImport stashes the given block away for import at a later time, // either via a forkchoice update or a sync extension. This method is meant to // be called by the newpayload command when the block seems to be ok, but some diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index d42904843b..f2a8165ddb 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -126,6 +126,23 @@ func (q *payloadQueue) has(id engine.PayloadID) bool { return false } +// getBlock retrieves block from a previously stored payload or nil if it does not exist. +func (q *payloadQueue) getBlockByHash(hash common.Hash) *types.Block { + q.lock.RLock() + defer q.lock.RUnlock() + + for _, item := range q.payloads { + if item == nil { + return nil + } + block := item.payload.GetBlock() + if block != nil && block.Hash() == hash { + return block + } + } + return nil +} + // headerQueueItem represents an hash->header tuple to store until it's retrieved // or evicted. type headerQueueItem struct { diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 5c137d4079..1df0a73150 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -178,8 +178,6 @@ func (n *Backend) Close() error { // Commit seals a block and moves the chain forward to a new empty block. func (n *Backend) Commit() common.Hash { - // wait for the transactions to be sync into cache - time.Sleep(350 * time.Millisecond) return n.beacon.Commit() } diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index 9307e2105a..a8fd7913c3 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -214,7 +214,6 @@ func TestForkResendTx(t *testing.T) { t.Fatalf("could not create transaction: %v", err) } client.SendTransaction(ctx, tx) - time.Sleep(1 * time.Second) sim.Commit() // 3. diff --git a/miner/payload_building.go b/miner/payload_building.go index 457e526218..94fc29c8be 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -238,6 +238,15 @@ func (payload *Payload) resolve(onlyFull bool) *engine.ExecutionPayloadEnvelope return nil } +func (payload *Payload) GetBlock() *types.Block { + if payload.full != nil { + return payload.full + } else if payload.empty != nil { + return payload.empty + } + return nil +} + // interruptBuilding sets an interrupt for a potentially ongoing // block building process. // This will prevent it from adding new transactions to the block, and if it is diff --git a/miner/worker.go b/miner/worker.go index 5cf742bcd5..3ea2bfc76b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1296,7 +1296,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs