From 5eda6008c86b0a971d9035c54009be9bf1242d55 Mon Sep 17 00:00:00 2001 From: "will@2012" Date: Fri, 19 Apr 2024 14:14:50 +0800 Subject: [PATCH] chore: polish keeper code --- cmd/utils/flags.go | 12 +++- core/blockchain.go | 8 ++- core/proof_keeper.go | 84 ++++++++++++++++++++-------- core/proof_keeper_test.go | 1 - eth/backend.go | 1 + eth/ethconfig/config.go | 1 + trie/triedb/pathdb/nodebufferlist.go | 22 ++++---- 7 files changed, 90 insertions(+), 39 deletions(-) delete mode 100644 core/proof_keeper_test.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index fc9ae0f96a..a4ddd7b283 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -23,7 +23,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" "math" "math/big" "net" @@ -35,6 +34,8 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" @@ -313,6 +314,12 @@ var ( Value: pathdb.DefaultProposeBlockInterval, Category: flags.StateCategory, } + EnableProofKeeperFlag = &cli.BoolFlag{ + Name: "pathdb.enableproofkeeper", + Usage: "Enable trie db proof keeper for get proposed proof", + Value: false, + Category: flags.StateCategory, + } StateHistoryFlag = &cli.Uint64Flag{ Name: "history.state", Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)", @@ -1881,6 +1888,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(ProposeBlockIntervalFlag.Name) { cfg.ProposeBlockInterval = ctx.Uint64(ProposeBlockIntervalFlag.Name) } + if ctx.IsSet(EnableProofKeeperFlag.Name) { + cfg.EnableProofKeeper = ctx.Bool(EnableProofKeeperFlag.Name) + } if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") diff --git a/core/blockchain.go b/core/blockchain.go index 532dcce9ed..702ac4061d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -47,7 +47,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/triedb/hashdb" "github.com/ethereum/go-ethereum/trie/triedb/pathdb" @@ -157,11 +156,11 @@ type CacheConfig struct { StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top PathNodeBuffer pathdb.NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer ProposeBlockInterval uint64 // Propose block to L1 block interval. + EnableProofKeeper bool // Whether to enable proof keeper SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height. - RpcClient *rpc.Client } // triedbConfig derives the configures for trie database. @@ -293,7 +292,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis cacheConfig = defaultCacheConfig } opts := &proofKeeperOptions{ - enable: true, // todo + enable: cacheConfig.EnableProofKeeper, watchStartKeepCh: make(chan *pathdb.KeepRecord), notifyFinishKeepCh: make(chan struct{}), } @@ -1048,6 +1047,9 @@ func (bc *BlockChain) Stop() { if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil { log.Info("Failed to journal in-memory trie nodes", "err", err) } + if err := bc.ProofKeeper.Stop(); err != nil { + log.Info("Failed to stop proof keeper", "err", err) + } } else { // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: diff --git a/core/proof_keeper.go b/core/proof_keeper.go index 8a812308a2..5ad226ae12 100644 --- a/core/proof_keeper.go +++ b/core/proof_keeper.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" trie2 "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/triedb/pathdb" ) @@ -24,6 +25,10 @@ const ( var ( l2ToL1MessagePasserAddr = common.HexToAddress(l2ToL1MessagePasser) + + addProofTimer = metrics.NewRegisteredTimer("proofkeeper/addproof/time", nil) + getInnerProofTimer = metrics.NewRegisteredTimer("proofkeeper/getinnerproof/time", nil) + queryProofTimer = metrics.NewRegisteredTimer("proofkeeper/queryproof/time", nil) ) // keeperMetaRecord is used to ensure proof continuous in scenarios such as enable/disable keeper, interval changes, reorg, etc. @@ -50,18 +55,13 @@ type proofDataRecord struct { StorageProof []common.StorageResult `json:"storageProof"` } -// todo: move metadb to opts. type proofKeeperOptions struct { enable bool watchStartKeepCh chan *pathdb.KeepRecord notifyFinishKeepCh chan struct{} } -// todo: ensure ancient sync write?? -// add metris -// add ut -// polish log -// todo gc +// ProofKeeper is used to store proposed proof and op-proposer can query. type ProofKeeper struct { opts *proofKeeperOptions blockChain *BlockChain @@ -70,6 +70,8 @@ type ProofKeeper struct { queryProofCh chan uint64 waitQueryProofCh chan *proofDataRecord + stopCh chan struct{} + waitStopCh chan error } func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper { @@ -77,12 +79,18 @@ func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper { opts: opts, queryProofCh: make(chan uint64), waitQueryProofCh: make(chan *proofDataRecord), + stopCh: make(chan struct{}), + waitStopCh: make(chan error), } log.Info("Succeed to init proof keeper", "options", opts) return keeper } +// Start is used to start event loop. func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Database) error { + if !keeper.opts.enable { + return nil + } var ( err error ancientDir string @@ -104,10 +112,19 @@ func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Data return nil } +// Stop is used to sync ancient db and stop the event loop. func (keeper *ProofKeeper) Stop() error { - return nil + if !keeper.opts.enable { + return nil + } + + close(keeper.stopCh) + err := <-keeper.waitStopCh + log.Info("Succeed to stop proof keeper", "error", err) + return err } +// GetKeepRecordWatchFunc returns a keeper callback func which is used by pathdb node buffer list. func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc { return func(keepRecord *pathdb.KeepRecord) { if keeper == nil { @@ -125,20 +142,32 @@ func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc { if keepRecord.BlockID%keepRecord.KeepInterval != 0 { return } + + startTimestamp := time.Now() + defer func() { + addProofTimer.UpdateSince(startTimestamp) + log.Info("Succeed to keep proof", "record", keepRecord, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() + keeper.opts.watchStartKeepCh <- keepRecord <-keeper.opts.notifyFinishKeepCh - log.Info("Succeed to keep proof in stop", "record", keepRecord) } } func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) { var ( - err error - header *types.Header - stateDB *state.StateDB - worldTrie *trie2.StateTrie - pRecord *proofDataRecord + err error + header *types.Header + stateDB *state.StateDB + worldTrie *trie2.StateTrie + accountProof common.ProofList + pRecord *proofDataRecord ) + startTimestamp := time.Now() + defer func() { + getInnerProofTimer.UpdateSince(startTimestamp) + log.Info("Succeed to get proof", "proof_record", pRecord, "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() if header = keeper.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil { return nil, fmt.Errorf("block is not found, block_id=%d", kRecord.BlockID) @@ -149,7 +178,6 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData if worldTrie, err = trie2.NewStateTrie(trie2.StateTrieID(header.Root), stateDB.Database().TrieDB()); err != nil { return nil, err } - var accountProof common.ProofList if err = worldTrie.Prove(crypto.Keccak256(l2ToL1MessagePasserAddr.Bytes()), &accountProof); err != nil { return nil, err } @@ -165,27 +193,26 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData StorageProof: make([]common.StorageResult, 0), } err = stateDB.Error() - log.Info("Succeed to get proof", "proof_record", pRecord) return pRecord, err } func (keeper *ProofKeeper) eventLoop() { var ( - putKeeperMetaRecordOnce bool // default = false - ancientInitSequenceID uint64 // default = 0 + err error + putKeeperMetaRecordOnce bool + ancientInitSequenceID uint64 ) + for { select { case keepRecord := <-keeper.opts.watchStartKeepCh: - // log.Info("keep proof", "record", keepRecord) var ( hasTruncatedMeta bool curProofID uint64 - startTimestamp time.Time + proofRecord *proofDataRecord ) - startTimestamp = time.Now() - proofRecord, err := keeper.getInnerProof(keepRecord) + proofRecord, err = keeper.getInnerProof(keepRecord) if err == nil { hasTruncatedMeta = keeper.truncateKeeperMetaRecordHeadIfNeeded(keepRecord.BlockID) metaList := keeper.getKeeperMetaRecordList() @@ -213,11 +240,12 @@ func (keeper *ProofKeeper) eventLoop() { proofRecord.ProofID = curProofID keeper.putProofDataRecord(proofRecord) } - log.Info("Keep a new proof", + log.Info("Succeed to keep a new proof", "block_id", keepRecord.BlockID, "state_root", keepRecord.StateRoot.String(), - "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + "error", err) keeper.opts.notifyFinishKeepCh <- struct{}{} + case queryBlockID := <-keeper.queryProofCh: var resultProofRecord *proofDataRecord metaList := keeper.getKeeperMetaRecordList() @@ -238,6 +266,14 @@ func (keeper *ProofKeeper) eventLoop() { } } keeper.waitQueryProofCh <- resultProofRecord + + case <-keeper.stopCh: + err = keeper.proofDataDB.Sync() + if err == nil { + err = keeper.proofDataDB.Close() + } + keeper.waitStopCh <- err + return } } } @@ -308,7 +344,6 @@ func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) { } rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.BlockID, meta) log.Info("Succeed to put keeper meta", "record", m) - } // proof data func @@ -405,6 +440,7 @@ func (keeper *ProofKeeper) QueryProposeProof(blockID uint64, stateRoot common.Ha ) startTimestamp = time.Now() defer func() { + queryProofTimer.UpdateSince(startTimestamp) log.Info("Query propose proof", "block_id", blockID, "state_root", stateRoot.String(), diff --git a/core/proof_keeper_test.go b/core/proof_keeper_test.go deleted file mode 100644 index 9a8bc9592b..0000000000 --- a/core/proof_keeper_test.go +++ /dev/null @@ -1 +0,0 @@ -package core diff --git a/eth/backend.go b/eth/backend.go index 765b96caa5..017848a6c0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -209,6 +209,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieCommitInterval: config.TrieCommitInterval, PathNodeBuffer: config.PathNodeBuffer, ProposeBlockInterval: config.ProposeBlockInterval, + EnableProofKeeper: config.EnableProofKeeper, } ) // Override the chain config with provided settings. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 30b2b848af..4c5c668685 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -114,6 +114,7 @@ type Config struct { StateScheme string `toml:",omitempty"` PathNodeBuffer pathdb.NodeBufferType `toml:",omitempty"` // Type of trienodebuffer to cache trie nodes in disklayer ProposeBlockInterval uint64 `toml:",omitempty"` // Keep the same with op-proposer propose block interval + EnableProofKeeper bool `toml:",omitempty"` // Whether to enable proof keeper // RequiredBlocks is a set of block number -> hash mappings which must be in the // canonical chain of all remote peers. Setting the option makes geth verify the diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index d78e9bf45c..a4c6136360 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -69,9 +69,7 @@ type nodebufferlist struct { stopFlushing atomic.Bool // Flag stops writing disk under background. stopCh chan struct{} waitStopCh chan struct{} - //notifyKeepCh chan *KeepRecord - //waitKeepCh chan struct{} - keepFunc KeepRecordWatchFunc + keepFunc KeepRecordWatchFunc } // newNodeBufferList initializes the node buffer list with the provided nodes @@ -471,7 +469,9 @@ func (nf *nodebufferlist) diffToBase() { log.Crit("committed block number misaligned", "block", buffer.block) } - nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + if nf.keepFunc != nil { + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + } nf.baseMux.Lock() err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes) @@ -531,13 +531,15 @@ func (nf *nodebufferlist) loop() { select { case <-nf.stopCh: // force flush to ensure all proposed-block can be kept by proof keeper - nf.mux.RLock() - notifyKeeperFunc := func(buffer *multiDifflayer) bool { - nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) - return true + if nf.keepFunc != nil { + nf.mux.RLock() + notifyKeeperFunc := func(buffer *multiDifflayer) bool { + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + return true + } + nf.traverseReverse(notifyKeeperFunc) + nf.mux.RUnlock() } - nf.traverseReverse(notifyKeeperFunc) - nf.mux.RUnlock() nf.waitStopCh <- struct{}{} return case <-mergeTicker.C: