diff --git a/core/blockchain.go b/core/blockchain.go index fc60ea5bc1..532dcce9ed 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -165,7 +165,7 @@ type CacheConfig struct { } // triedbConfig derives the configures for trie database. -func (c *CacheConfig) triedbConfig() *trie.Config { +func (c *CacheConfig) triedbConfig(keepWatchFunc pathdb.KeepRecordWatchFunc) *trie.Config { config := &trie.Config{Preimages: c.Preimages} if c.StateScheme == rawdb.HashScheme { config.HashDB = &hashdb.Config{ @@ -179,8 +179,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config { CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024, ProposeBlockInterval: c.ProposeBlockInterval, - KeepCh: make(chan *pathdb.KeepRecord), - WaitKeepCh: make(chan struct{}), + KeepFunc: keepWatchFunc, } } return config @@ -293,9 +292,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if cacheConfig == nil { cacheConfig = defaultCacheConfig } - + opts := &proofKeeperOptions{ + enable: true, // todo + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan struct{}), + } + proofKeeper := newProofKeeper(opts) // Open trie database with provided config - trieConfig := cacheConfig.triedbConfig() + trieConfig := cacheConfig.triedbConfig(proofKeeper.GetKeepRecordWatchFunc()) triedb := trie.NewDatabase(db, trieConfig) // Setup the genesis block, commit the provided genesis specification @@ -344,19 +348,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) - var err error - if cacheConfig.StateScheme == rawdb.PathScheme && trieConfig.PathDB.TrieNodeBufferType == pathdb.NodeBufferList { - opts := &proofKeeperOptions{ - enable: true, - watchStartKeepCh: trieConfig.PathDB.KeepCh, - notifyFinishKeepCh: trieConfig.PathDB.WaitKeepCh, - blockChain: bc, - } - bc.ProofKeeper, err = newProofKeeper(db, opts) - if err != nil { - return nil, err - } + err := proofKeeper.Start(bc, db) + if err != nil { + return nil, err } + bc.ProofKeeper = proofKeeper bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { diff --git a/core/proof_keeper.go b/core/proof_keeper.go index 9428157ffb..8a812308a2 100644 --- a/core/proof_keeper.go +++ b/core/proof_keeper.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient/gethclient" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" trie2 "github.com/ethereum/go-ethereum/trie" @@ -56,7 +55,6 @@ type proofKeeperOptions struct { enable bool watchStartKeepCh chan *pathdb.KeepRecord notifyFinishKeepCh chan struct{} - blockChain *BlockChain } // todo: ensure ancient sync write?? @@ -66,41 +64,71 @@ type proofKeeperOptions struct { // todo gc type ProofKeeper struct { opts *proofKeeperOptions + blockChain *BlockChain keeperMetaDB ethdb.Database proofDataDB *rawdb.ResettableFreezer - selfClient *gethclient.Client queryProofCh chan uint64 waitQueryProofCh chan *proofDataRecord } -func newProofKeeper(keeperMetaDB ethdb.Database, opts *proofKeeperOptions) (*ProofKeeper, error) { +func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper { + keeper := &ProofKeeper{ + opts: opts, + queryProofCh: make(chan uint64), + waitQueryProofCh: make(chan *proofDataRecord), + } + log.Info("Succeed to init proof keeper", "options", opts) + return keeper +} + +func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Database) error { var ( err error ancientDir string - keeper *ProofKeeper ) - if ancientDir, err = keeperMetaDB.AncientDatadir(); err != nil { + keeper.blockChain = blockChain + keeper.keeperMetaDB = keeperMetaDB + if ancientDir, err = keeper.keeperMetaDB.AncientDatadir(); err != nil { log.Error("Failed to get ancient data dir", "error", err) - return nil, err - } - keeper = &ProofKeeper{ - opts: opts, - keeperMetaDB: keeperMetaDB, + return err } if keeper.proofDataDB, err = rawdb.NewProofFreezer(ancientDir, false); err != nil { log.Error("Failed to new proof ancient freezer", "error", err) - return nil, err + return err } - keeper.queryProofCh = make(chan uint64) - keeper.waitQueryProofCh = make(chan *proofDataRecord) - go keeper.eventLoop() + log.Info("Succeed to start proof keeper") + return nil +} - log.Info("Succeed to init proof keeper", "options", opts) - return keeper, nil +func (keeper *ProofKeeper) Stop() error { + return nil +} + +func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc { + return func(keepRecord *pathdb.KeepRecord) { + if keeper == nil { + return + } + if keeper.opts == nil || keeper.opts.watchStartKeepCh == nil || keeper.opts.notifyFinishKeepCh == nil { + return + } + if !keeper.opts.enable { + return + } + if keepRecord.BlockID == 0 || keepRecord.KeepInterval == 0 { + return + } + if keepRecord.BlockID%keepRecord.KeepInterval != 0 { + return + } + keeper.opts.watchStartKeepCh <- keepRecord + <-keeper.opts.notifyFinishKeepCh + log.Info("Succeed to keep proof in stop", "record", keepRecord) + } } func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) { @@ -112,10 +140,10 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData pRecord *proofDataRecord ) - if header = keeper.opts.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil { + if header = keeper.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil { return nil, fmt.Errorf("block is not found, block_id=%d", kRecord.BlockID) } - if stateDB, err = keeper.opts.blockChain.StateAt(header.Root); err != nil { + if stateDB, err = keeper.blockChain.StateAt(header.Root); err != nil { return nil, err } if worldTrie, err = trie2.NewStateTrie(trie2.StateTrieID(header.Root), stateDB.Database().TrieDB()); err != nil { @@ -264,7 +292,6 @@ func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64) hasTruncated = true rawdb.DeleteKeeperMeta(batch, m.BlockID) } - } err = batch.Write() if err != nil { diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 854d8763f5..2a54fd161f 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -100,8 +100,7 @@ type Config struct { DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes ReadOnly bool // Flag whether the database is opened in read only mode. ProposeBlockInterval uint64 // Propose block to L1 block interval. - KeepCh chan *KeepRecord - WaitKeepCh chan struct{} + KeepFunc KeepRecordWatchFunc } // sanitize checks the provided user configurations and changes anything that's @@ -320,7 +319,7 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepFunc) dl := newDiskLayer(root, 0, db, nil, nb) nb.setClean(dl.cleans) db.tree.reset(dl) diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index 20e601806b..129ac501eb 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -119,13 +119,12 @@ func NewTrieNodeBuffer( limit int, nodes map[common.Hash]map[string]*trienode.Node, layers, proposeBlockInterval uint64, - keepCh chan *KeepRecord, - waitKeepCh chan struct{}, + keepFunc KeepRecordWatchFunc, ) trienodebuffer { log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType]) switch trieNodeBufferType { case NodeBufferList: - return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepCh, waitKeepCh) + return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc) case AsyncNodeBuffer: return newAsyncNodeBuffer(limit, nodes, layers) case SyncNodeBuffer: diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index af6d0f60a6..41cb203d1d 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepFunc) dl := newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nb) nb.setClean(dl.cleans) return dl @@ -173,7 +173,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.KeepFunc) base := newDiskLayer(root, id, db, nil, nb) nb.setClean(base.cleans) return base, nil diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index a43c7f4af7..d78e9bf45c 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -34,6 +34,7 @@ type KeepRecord struct { StateRoot common.Hash KeepInterval uint64 } +type KeepRecordWatchFunc func(*KeepRecord) var _ trienodebuffer = &nodebufferlist{} @@ -68,8 +69,9 @@ type nodebufferlist struct { stopFlushing atomic.Bool // Flag stops writing disk under background. stopCh chan struct{} waitStopCh chan struct{} - notifyKeepCh chan *KeepRecord - waitKeepCh chan struct{} + //notifyKeepCh chan *KeepRecord + //waitKeepCh chan struct{} + keepFunc KeepRecordWatchFunc } // newNodeBufferList initializes the node buffer list with the provided nodes @@ -79,8 +81,7 @@ func newNodeBufferList( nodes map[common.Hash]map[string]*trienode.Node, layers uint64, proposeBlockInterval uint64, - keepCh chan *KeepRecord, - waitKeepCh chan struct{}) *nodebufferlist { + keepFunc KeepRecordWatchFunc) *nodebufferlist { var ( rsevMdNum uint64 dlInMd uint64 @@ -110,20 +111,19 @@ func newNodeBufferList( base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) nf := &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - head: ele, - tail: ele, - count: 1, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), - waitStopCh: make(chan struct{}), - notifyKeepCh: keepCh, - waitKeepCh: waitKeepCh, + db: db, + wpBlocks: wpBlocks, + rsevMdNum: rsevMdNum, + dlInMd: dlInMd, + limit: limit, + base: base, + head: ele, + tail: ele, + count: 1, + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + waitStopCh: make(chan struct{}), + keepFunc: keepFunc, } go nf.loop() @@ -471,10 +471,7 @@ func (nf *nodebufferlist) diffToBase() { log.Crit("committed block number misaligned", "block", buffer.block) } - if buffer.block != 0 && buffer.block%nf.wpBlocks == 0 && nf.notifyKeepCh != nil { // maybe keep proof - nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks} - <-nf.waitKeepCh - } + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) nf.baseMux.Lock() err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes) @@ -534,22 +531,13 @@ func (nf *nodebufferlist) loop() { select { case <-nf.stopCh: // force flush to ensure all proposed-block can be kept by proof keeper - if nf.notifyKeepCh != nil { // maybe keep proof - nf.mux.RLock() - - notifyKeeperFunc := func(buffer *multiDifflayer) bool { - if buffer.block != 0 && buffer.block%nf.wpBlocks == 0 { - keepRecord := &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks} - nf.notifyKeepCh <- keepRecord - <-nf.waitKeepCh - log.Info("Succeed to keep proof in stop", "record", keepRecord) - } - return true - } - nf.traverseReverse(notifyKeeperFunc) - - nf.mux.RUnlock() + nf.mux.RLock() + notifyKeeperFunc := func(buffer *multiDifflayer) bool { + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + return true } + nf.traverseReverse(notifyKeeperFunc) + nf.mux.RUnlock() nf.waitStopCh <- struct{}{} return case <-mergeTicker.C: