Skip to content

Commit

Permalink
Merge branch 'develop' into txpool-opt-broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhang2023 authored Jan 8, 2025
2 parents 3571931 + 1168abd commit 2efa74b
Show file tree
Hide file tree
Showing 20 changed files with 342 additions and 125 deletions.
3 changes: 0 additions & 3 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
12 changes: 12 additions & 0 deletions beacon/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
// building of the payload to commence.
type PayloadVersion byte

const (
GetPayloadStage = "getPayload"
NewPayloadStage = "newPayload"
ForkchoiceUpdatedStage = "forkchoiceUpdated"
)

var (
PayloadV1 PayloadVersion = 0x1
PayloadV2 PayloadVersion = 0x2
Expand Down Expand Up @@ -181,6 +187,12 @@ type ForkchoiceStateV1 struct {
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}

type OpSealPayloadResponse struct {
ErrStage string `json:"errStage"`
PayloadStatus PayloadStatusV1 `json:"payloadStatus"`
Payload *ExecutionPayloadEnvelope `json:"payload"`
}

func encodeTransactions(txs []*types.Transaction) [][]byte {
var enc = make([][]byte, len(txs))
for i, tx := range txs {
Expand Down
4 changes: 4 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
cfg.Eth.OverrideVerkle = &v
}

if ctx.Bool(utils.MiningEnabledFlag.Name) {
cfg.Eth.TxPool.EnableCache = true
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Create gauge with geth system and build information
Expand Down
12 changes: 9 additions & 3 deletions consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,17 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
// Finalize and assemble the block.
beacon.Finalize(chain, header, state, txs, uncles, withdrawals)

// Assign the final state root to header.
header.Root = state.IntermediateRoot(true)
rootCh := make(chan common.Hash)
go func() {
rootCh <- state.IntermediateRoot(true)
}()

block := types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil))
headerWithRoot := block.Header()
headerWithRoot.Root = <-rootCh

// Assemble and return the final block.
return types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)), nil
return block.WithSeal(headerWithRoot), nil
}

// Seal generates a new sealing request for the given input block and pushes
Expand Down
8 changes: 5 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {

// ValidateState validates the various changes that happen after a state transition,
// such as amount of used gas, the receipt roots and the state root itself.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -186,14 +186,16 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
return nil
},
func() error {
}
if !skipRoot {
validateFuns = append(validateFuns, func() error {
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
},
})
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
Expand Down
74 changes: 66 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,13 @@ func (bc *BlockChain) procFutureBlocks() {
}
}

// CacheBlock cache block in memory
func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) {
bc.hc.numberCache.Add(hash, block.NumberU64())
bc.hc.headerCache.Add(hash, block.Header())
bc.blockCache.Add(hash, block)
}

// CacheMiningReceipts cache receipts in memory
func (bc *BlockChain) CacheMiningReceipts(hash common.Hash, receipts types.Receipts) {
bc.miningReceiptsCache.Add(hash, receipts)
Expand Down Expand Up @@ -1736,6 +1743,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return 0, nil
}

minerMode := false
if len(chain) == 1 {
block := chain[0]
_, receiptExist := bc.miningReceiptsCache.Get(block.Hash())
_, logExist := bc.miningTxLogsCache.Get(block.Hash())
_, stateExist := bc.miningStateCache.Get(block.Hash())
minerMode = receiptExist && logExist && stateExist
}

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)

Expand All @@ -1759,7 +1775,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()
var block *types.Block
var err error
if minerMode {
block = chain[0]
it.index = 0
} else {
block, err = it.next()
}

// Left-trim all the known blocks that don't need to build snapshot
if bc.skipBlock(err, it) {
Expand Down Expand Up @@ -1975,11 +1998,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
ptime := time.Since(pstart)

vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
// Async validate if minerMode
asyncValidateStateCh := make(chan error, 1)
if minerMode {
header := block.Header()
// Can not validate root concurrently
if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root {
err := fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
}
go func() {
asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true)
}()
} else {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
}
}

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

Expand Down Expand Up @@ -2015,6 +2055,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if err != nil {
return it.index, err
}
if minerMode {
if err := <-asyncValidateStateCh; err != nil {
panic(fmt.Errorf("self mined block(hash: %x number %v) async verify state err: %w", block.Hash(), block.NumberU64(), err))
}
}
bc.CacheBlock(block.Hash(), block)

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
Expand All @@ -2033,10 +2080,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
stats.usedGas += usedGas

var snapDiffItems, snapBufItems common.StorageSize
if bc.snaps != nil {
if bc.snaps != nil && !minerMode {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()

var trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize
if !minerMode {
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ = bc.triedb.Size()
}
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000)

Expand Down Expand Up @@ -2536,10 +2587,17 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
return common.Hash{}, err
}
}
bc.writeHeadBlock(head)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bc.writeHeadBlock(head)
}()
// Emit events
logs := bc.collectLogs(head, false)
wg.Wait()

bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, false)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
11 changes: 7 additions & 4 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// Note, bumping this up might drastically increase the size of the bloom
// filters that's stored in every diff layer. Don't do that without fully
// understanding all the implications.
aggregatorMemoryLimit = uint64(4 * 1024 * 1024)
aggregatorMemoryLimit = uint64(4 * 1024 * 1024 * 3) // opBNB block gas limit is about 3 times to Ethereum

// aggregatorItemLimit is an approximate number of items that will end up
// in the aggregator layer before it's flushed out to disk. A plain account
Expand All @@ -50,23 +50,26 @@ var (
// smaller number to be on the safe side.
aggregatorItemLimit = aggregatorMemoryLimit / 42

// bloomItemLimit is an approximate number of all difflayer items (128 difflayers + 1 aggregatorlayer)
bloomItemLimit = 25*10000*5 + aggregatorItemLimit

// bloomTargetError is the target false positive rate when the aggregator
// layer is at its fullest. The actual value will probably move around up
// and down from this number, it's mostly a ballpark figure.
//
// Note, dropping this down might drastically increase the size of the bloom
// filters that's stored in every diff layer. Don't do that without fully
// understanding all the implications.
bloomTargetError = 0.02
bloomTargetError = 0.01

// bloomSize is the ideal bloom filter size given the maximum number of items
// it's expected to hold and the target false positive error rate.
bloomSize = math.Ceil(float64(aggregatorItemLimit) * math.Log(bloomTargetError) / math.Log(1/math.Pow(2, math.Log(2))))
bloomSize = math.Ceil(float64(bloomItemLimit) * math.Log(bloomTargetError) / math.Log(1/math.Pow(2, math.Log(2))))

// bloomFuncs is the ideal number of bits a single entry should set in the
// bloom filter to keep its size to a minimum (given it's size and maximum
// entry count).
bloomFuncs = math.Round((bloomSize / float64(aggregatorItemLimit)) * math.Log(2))
bloomFuncs = math.Round((bloomSize / float64(bloomItemLimit)) * math.Log(2))

// the bloom offsets are runtime constants which determines which part of the
// account/storage hash the hasher functions looks at, to determine the
Expand Down
1 change: 0 additions & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,6 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er

if metrics.EnabledExpensive {
defer func(start time.Time) {
s.AccountCommits += time.Since(start)
accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
accountDeletedMeter.Mark(int64(s.AccountDeleted))
Expand Down
Loading

0 comments on commit 2efa74b

Please sign in to comment.