Skip to content

Commit

Permalink
chore: refine keep watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Apr 19, 2024
1 parent e296cd1 commit 87a8e97
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 82 deletions.
30 changes: 13 additions & 17 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ type CacheConfig struct {
}

// triedbConfig derives the configures for trie database.
func (c *CacheConfig) triedbConfig() *trie.Config {
func (c *CacheConfig) triedbConfig(keepWatchFunc pathdb.KeepRecordWatchFunc) *trie.Config {
config := &trie.Config{Preimages: c.Preimages}
if c.StateScheme == rawdb.HashScheme {
config.HashDB = &hashdb.Config{
Expand All @@ -179,8 +179,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
ProposeBlockInterval: c.ProposeBlockInterval,
KeepCh: make(chan *pathdb.KeepRecord),
WaitKeepCh: make(chan struct{}),
KeepFunc: keepWatchFunc,
}
}
return config
Expand Down Expand Up @@ -293,9 +292,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if cacheConfig == nil {
cacheConfig = defaultCacheConfig
}

opts := &proofKeeperOptions{
enable: true, // todo
watchStartKeepCh: make(chan *pathdb.KeepRecord),
notifyFinishKeepCh: make(chan struct{}),
}
proofKeeper := newProofKeeper(opts)
// Open trie database with provided config
trieConfig := cacheConfig.triedbConfig()
trieConfig := cacheConfig.triedbConfig(proofKeeper.GetKeepRecordWatchFunc())
triedb := trie.NewDatabase(db, trieConfig)

// Setup the genesis block, commit the provided genesis specification
Expand Down Expand Up @@ -344,19 +348,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
if cacheConfig.StateScheme == rawdb.PathScheme && trieConfig.PathDB.TrieNodeBufferType == pathdb.NodeBufferList {
opts := &proofKeeperOptions{
enable: true,
watchStartKeepCh: trieConfig.PathDB.KeepCh,
notifyFinishKeepCh: trieConfig.PathDB.WaitKeepCh,
blockChain: bc,
}
bc.ProofKeeper, err = newProofKeeper(db, opts)
if err != nil {
return nil, err
}
err := proofKeeper.Start(bc, db)
if err != nil {
return nil, err
}
bc.ProofKeeper = proofKeeper

bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
Expand Down
67 changes: 47 additions & 20 deletions core/proof_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient/gethclient"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
trie2 "github.com/ethereum/go-ethereum/trie"
Expand Down Expand Up @@ -56,7 +55,6 @@ type proofKeeperOptions struct {
enable bool
watchStartKeepCh chan *pathdb.KeepRecord
notifyFinishKeepCh chan struct{}
blockChain *BlockChain
}

// todo: ensure ancient sync write??
Expand All @@ -66,41 +64,71 @@ type proofKeeperOptions struct {
// todo gc
type ProofKeeper struct {
opts *proofKeeperOptions
blockChain *BlockChain
keeperMetaDB ethdb.Database
proofDataDB *rawdb.ResettableFreezer
selfClient *gethclient.Client

queryProofCh chan uint64
waitQueryProofCh chan *proofDataRecord
}

func newProofKeeper(keeperMetaDB ethdb.Database, opts *proofKeeperOptions) (*ProofKeeper, error) {
func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper {
keeper := &ProofKeeper{
opts: opts,
queryProofCh: make(chan uint64),
waitQueryProofCh: make(chan *proofDataRecord),
}
log.Info("Succeed to init proof keeper", "options", opts)
return keeper
}

func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Database) error {
var (
err error
ancientDir string
keeper *ProofKeeper
)

if ancientDir, err = keeperMetaDB.AncientDatadir(); err != nil {
keeper.blockChain = blockChain
keeper.keeperMetaDB = keeperMetaDB
if ancientDir, err = keeper.keeperMetaDB.AncientDatadir(); err != nil {
log.Error("Failed to get ancient data dir", "error", err)
return nil, err
}
keeper = &ProofKeeper{
opts: opts,
keeperMetaDB: keeperMetaDB,
return err
}
if keeper.proofDataDB, err = rawdb.NewProofFreezer(ancientDir, false); err != nil {
log.Error("Failed to new proof ancient freezer", "error", err)
return nil, err
return err
}

keeper.queryProofCh = make(chan uint64)
keeper.waitQueryProofCh = make(chan *proofDataRecord)

go keeper.eventLoop()
log.Info("Succeed to start proof keeper")
return nil
}

log.Info("Succeed to init proof keeper", "options", opts)
return keeper, nil
func (keeper *ProofKeeper) Stop() error {
return nil
}

func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc {
return func(keepRecord *pathdb.KeepRecord) {
if keeper == nil {
return
}
if keeper.opts == nil || keeper.opts.watchStartKeepCh == nil || keeper.opts.notifyFinishKeepCh == nil {
return
}
if !keeper.opts.enable {
return
}
if keepRecord.BlockID == 0 || keepRecord.KeepInterval == 0 {
return
}
if keepRecord.BlockID%keepRecord.KeepInterval != 0 {
return
}
keeper.opts.watchStartKeepCh <- keepRecord
<-keeper.opts.notifyFinishKeepCh
log.Info("Succeed to keep proof in stop", "record", keepRecord)
}
}

func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) {
Expand All @@ -112,10 +140,10 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData
pRecord *proofDataRecord
)

if header = keeper.opts.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil {
if header = keeper.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil {
return nil, fmt.Errorf("block is not found, block_id=%d", kRecord.BlockID)
}
if stateDB, err = keeper.opts.blockChain.StateAt(header.Root); err != nil {
if stateDB, err = keeper.blockChain.StateAt(header.Root); err != nil {
return nil, err
}
if worldTrie, err = trie2.NewStateTrie(trie2.StateTrieID(header.Root), stateDB.Database().TrieDB()); err != nil {
Expand Down Expand Up @@ -264,7 +292,6 @@ func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64)
hasTruncated = true
rawdb.DeleteKeeperMeta(batch, m.BlockID)
}

}
err = batch.Write()
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions trie/triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ type Config struct {
DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes
ReadOnly bool // Flag whether the database is opened in read only mode.
ProposeBlockInterval uint64 // Propose block to L1 block interval.
KeepCh chan *KeepRecord
WaitKeepCh chan struct{}
KeepFunc KeepRecordWatchFunc
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -320,7 +319,7 @@ func (db *Database) Enable(root common.Hash) error {
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh)
nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepFunc)
dl := newDiskLayer(root, 0, db, nil, nb)
nb.setClean(dl.cleans)
db.tree.reset(dl)
Expand Down
5 changes: 2 additions & 3 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,12 @@ func NewTrieNodeBuffer(
limit int,
nodes map[common.Hash]map[string]*trienode.Node,
layers, proposeBlockInterval uint64,
keepCh chan *KeepRecord,
waitKeepCh chan struct{},
keepFunc KeepRecordWatchFunc,
) trienodebuffer {
log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType])
switch trieNodeBufferType {
case NodeBufferList:
return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepCh, waitKeepCh)
return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc)
case AsyncNodeBuffer:
return newAsyncNodeBuffer(limit, nodes, layers)
case SyncNodeBuffer:
Expand Down
4 changes: 2 additions & 2 deletions trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh)
nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepFunc)
dl := newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nb)
nb.setClean(dl.cleans)
return dl
Expand Down Expand Up @@ -173,7 +173,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}
// Calculate the internal state transitions by id difference.
nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh)
nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.KeepFunc)
base := newDiskLayer(root, id, db, nil, nb)
nb.setClean(base.cleans)
return base, nil
Expand Down
62 changes: 25 additions & 37 deletions trie/triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type KeepRecord struct {
StateRoot common.Hash
KeepInterval uint64
}
type KeepRecordWatchFunc func(*KeepRecord)

var _ trienodebuffer = &nodebufferlist{}

Expand Down Expand Up @@ -68,8 +69,9 @@ type nodebufferlist struct {
stopFlushing atomic.Bool // Flag stops writing disk under background.
stopCh chan struct{}
waitStopCh chan struct{}
notifyKeepCh chan *KeepRecord
waitKeepCh chan struct{}
//notifyKeepCh chan *KeepRecord
//waitKeepCh chan struct{}
keepFunc KeepRecordWatchFunc
}

// newNodeBufferList initializes the node buffer list with the provided nodes
Expand All @@ -79,8 +81,7 @@ func newNodeBufferList(
nodes map[common.Hash]map[string]*trienode.Node,
layers uint64,
proposeBlockInterval uint64,
keepCh chan *KeepRecord,
waitKeepCh chan struct{}) *nodebufferlist {
keepFunc KeepRecordWatchFunc) *nodebufferlist {
var (
rsevMdNum uint64
dlInMd uint64
Expand Down Expand Up @@ -110,20 +111,19 @@ func newNodeBufferList(
base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers)
ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0)
nf := &nodebufferlist{
db: db,
wpBlocks: wpBlocks,
rsevMdNum: rsevMdNum,
dlInMd: dlInMd,
limit: limit,
base: base,
head: ele,
tail: ele,
count: 1,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
notifyKeepCh: keepCh,
waitKeepCh: waitKeepCh,
db: db,
wpBlocks: wpBlocks,
rsevMdNum: rsevMdNum,
dlInMd: dlInMd,
limit: limit,
base: base,
head: ele,
tail: ele,
count: 1,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
keepFunc: keepFunc,
}
go nf.loop()

Expand Down Expand Up @@ -471,10 +471,7 @@ func (nf *nodebufferlist) diffToBase() {
log.Crit("committed block number misaligned", "block", buffer.block)
}

if buffer.block != 0 && buffer.block%nf.wpBlocks == 0 && nf.notifyKeepCh != nil { // maybe keep proof
nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}
<-nf.waitKeepCh
}
nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks})

nf.baseMux.Lock()
err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes)
Expand Down Expand Up @@ -534,22 +531,13 @@ func (nf *nodebufferlist) loop() {
select {
case <-nf.stopCh:
// force flush to ensure all proposed-block can be kept by proof keeper
if nf.notifyKeepCh != nil { // maybe keep proof
nf.mux.RLock()

notifyKeeperFunc := func(buffer *multiDifflayer) bool {
if buffer.block != 0 && buffer.block%nf.wpBlocks == 0 {
keepRecord := &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}
nf.notifyKeepCh <- keepRecord
<-nf.waitKeepCh
log.Info("Succeed to keep proof in stop", "record", keepRecord)
}
return true
}
nf.traverseReverse(notifyKeeperFunc)

nf.mux.RUnlock()
nf.mux.RLock()
notifyKeeperFunc := func(buffer *multiDifflayer) bool {
nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks})
return true
}
nf.traverseReverse(notifyKeeperFunc)
nf.mux.RUnlock()
nf.waitStopCh <- struct{}{}
return
case <-mergeTicker.C:
Expand Down

0 comments on commit 87a8e97

Please sign in to comment.