Skip to content

Commit

Permalink
chore: polish keeper code
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Apr 19, 2024
1 parent 87a8e97 commit 5eda600
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 39 deletions.
12 changes: 11 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler"
"math"
"math/big"
"net"
Expand All @@ -35,6 +34,8 @@ import (
"strings"
"time"

"github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -313,6 +314,12 @@ var (
Value: pathdb.DefaultProposeBlockInterval,
Category: flags.StateCategory,
}
EnableProofKeeperFlag = &cli.BoolFlag{
Name: "pathdb.enableproofkeeper",
Usage: "Enable trie db proof keeper for get proposed proof",
Value: false,
Category: flags.StateCategory,
}
StateHistoryFlag = &cli.Uint64Flag{
Name: "history.state",
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
Expand Down Expand Up @@ -1881,6 +1888,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(ProposeBlockIntervalFlag.Name) {
cfg.ProposeBlockInterval = ctx.Uint64(ProposeBlockIntervalFlag.Name)
}
if ctx.IsSet(EnableProofKeeperFlag.Name) {
cfg.EnableProofKeeper = ctx.Bool(EnableProofKeeperFlag.Name)
}
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
cfg.TransactionHistory = 0
log.Warn("Disabled transaction unindexing for archive node")
Expand Down
8 changes: 5 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
Expand Down Expand Up @@ -157,11 +156,11 @@ type CacheConfig struct {
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathNodeBuffer pathdb.NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer
ProposeBlockInterval uint64 // Propose block to L1 block interval.
EnableProofKeeper bool // Whether to enable proof keeper
SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height.
RpcClient *rpc.Client
}

// triedbConfig derives the configures for trie database.
Expand Down Expand Up @@ -293,7 +292,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig = defaultCacheConfig
}
opts := &proofKeeperOptions{
enable: true, // todo
enable: cacheConfig.EnableProofKeeper,
watchStartKeepCh: make(chan *pathdb.KeepRecord),
notifyFinishKeepCh: make(chan struct{}),
}
Expand Down Expand Up @@ -1048,6 +1047,9 @@ func (bc *BlockChain) Stop() {
if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil {
log.Info("Failed to journal in-memory trie nodes", "err", err)
}
if err := bc.ProofKeeper.Stop(); err != nil {
log.Info("Failed to stop proof keeper", "err", err)
}
} else {
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
Expand Down
84 changes: 60 additions & 24 deletions core/proof_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
trie2 "github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
)
Expand All @@ -24,6 +25,10 @@ const (

var (
l2ToL1MessagePasserAddr = common.HexToAddress(l2ToL1MessagePasser)

addProofTimer = metrics.NewRegisteredTimer("proofkeeper/addproof/time", nil)
getInnerProofTimer = metrics.NewRegisteredTimer("proofkeeper/getinnerproof/time", nil)
queryProofTimer = metrics.NewRegisteredTimer("proofkeeper/queryproof/time", nil)
)

// keeperMetaRecord is used to ensure proof continuous in scenarios such as enable/disable keeper, interval changes, reorg, etc.
Expand All @@ -50,18 +55,13 @@ type proofDataRecord struct {
StorageProof []common.StorageResult `json:"storageProof"`
}

// todo: move metadb to opts.
type proofKeeperOptions struct {
enable bool
watchStartKeepCh chan *pathdb.KeepRecord
notifyFinishKeepCh chan struct{}
}

// todo: ensure ancient sync write??
// add metris
// add ut
// polish log
// todo gc
// ProofKeeper is used to store proposed proof and op-proposer can query.
type ProofKeeper struct {
opts *proofKeeperOptions
blockChain *BlockChain
Expand All @@ -70,19 +70,27 @@ type ProofKeeper struct {

queryProofCh chan uint64
waitQueryProofCh chan *proofDataRecord
stopCh chan struct{}
waitStopCh chan error
}

func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper {
keeper := &ProofKeeper{
opts: opts,
queryProofCh: make(chan uint64),
waitQueryProofCh: make(chan *proofDataRecord),
stopCh: make(chan struct{}),
waitStopCh: make(chan error),
}
log.Info("Succeed to init proof keeper", "options", opts)
return keeper
}

// Start is used to start event loop.
func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Database) error {
if !keeper.opts.enable {
return nil
}
var (
err error
ancientDir string
Expand All @@ -104,10 +112,19 @@ func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Data
return nil
}

// Stop is used to sync ancient db and stop the event loop.
func (keeper *ProofKeeper) Stop() error {
return nil
if !keeper.opts.enable {
return nil
}

close(keeper.stopCh)
err := <-keeper.waitStopCh
log.Info("Succeed to stop proof keeper", "error", err)
return err
}

// GetKeepRecordWatchFunc returns a keeper callback func which is used by pathdb node buffer list.
func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc {
return func(keepRecord *pathdb.KeepRecord) {
if keeper == nil {
Expand All @@ -125,20 +142,32 @@ func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc {
if keepRecord.BlockID%keepRecord.KeepInterval != 0 {
return
}

startTimestamp := time.Now()
defer func() {
addProofTimer.UpdateSince(startTimestamp)
log.Info("Succeed to keep proof", "record", keepRecord, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
}()

keeper.opts.watchStartKeepCh <- keepRecord
<-keeper.opts.notifyFinishKeepCh
log.Info("Succeed to keep proof in stop", "record", keepRecord)
}
}

func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) {
var (
err error
header *types.Header
stateDB *state.StateDB
worldTrie *trie2.StateTrie
pRecord *proofDataRecord
err error
header *types.Header
stateDB *state.StateDB
worldTrie *trie2.StateTrie
accountProof common.ProofList
pRecord *proofDataRecord
)
startTimestamp := time.Now()
defer func() {
getInnerProofTimer.UpdateSince(startTimestamp)
log.Info("Succeed to get proof", "proof_record", pRecord, "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
}()

if header = keeper.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil {
return nil, fmt.Errorf("block is not found, block_id=%d", kRecord.BlockID)
Expand All @@ -149,7 +178,6 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData
if worldTrie, err = trie2.NewStateTrie(trie2.StateTrieID(header.Root), stateDB.Database().TrieDB()); err != nil {
return nil, err
}
var accountProof common.ProofList
if err = worldTrie.Prove(crypto.Keccak256(l2ToL1MessagePasserAddr.Bytes()), &accountProof); err != nil {
return nil, err
}
Expand All @@ -165,27 +193,26 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData
StorageProof: make([]common.StorageResult, 0),
}
err = stateDB.Error()
log.Info("Succeed to get proof", "proof_record", pRecord)
return pRecord, err
}

func (keeper *ProofKeeper) eventLoop() {
var (
putKeeperMetaRecordOnce bool // default = false
ancientInitSequenceID uint64 // default = 0
err error
putKeeperMetaRecordOnce bool
ancientInitSequenceID uint64
)

for {
select {
case keepRecord := <-keeper.opts.watchStartKeepCh:
// log.Info("keep proof", "record", keepRecord)
var (
hasTruncatedMeta bool
curProofID uint64
startTimestamp time.Time
proofRecord *proofDataRecord
)

startTimestamp = time.Now()
proofRecord, err := keeper.getInnerProof(keepRecord)
proofRecord, err = keeper.getInnerProof(keepRecord)
if err == nil {
hasTruncatedMeta = keeper.truncateKeeperMetaRecordHeadIfNeeded(keepRecord.BlockID)
metaList := keeper.getKeeperMetaRecordList()
Expand Down Expand Up @@ -213,11 +240,12 @@ func (keeper *ProofKeeper) eventLoop() {
proofRecord.ProofID = curProofID
keeper.putProofDataRecord(proofRecord)
}
log.Info("Keep a new proof",
log.Info("Succeed to keep a new proof",
"block_id", keepRecord.BlockID,
"state_root", keepRecord.StateRoot.String(),
"error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
"error", err)
keeper.opts.notifyFinishKeepCh <- struct{}{}

case queryBlockID := <-keeper.queryProofCh:
var resultProofRecord *proofDataRecord
metaList := keeper.getKeeperMetaRecordList()
Expand All @@ -238,6 +266,14 @@ func (keeper *ProofKeeper) eventLoop() {
}
}
keeper.waitQueryProofCh <- resultProofRecord

case <-keeper.stopCh:
err = keeper.proofDataDB.Sync()
if err == nil {
err = keeper.proofDataDB.Close()
}
keeper.waitStopCh <- err
return
}
}
}
Expand Down Expand Up @@ -308,7 +344,6 @@ func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) {
}
rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.BlockID, meta)
log.Info("Succeed to put keeper meta", "record", m)

}

// proof data func
Expand Down Expand Up @@ -405,6 +440,7 @@ func (keeper *ProofKeeper) QueryProposeProof(blockID uint64, stateRoot common.Ha
)
startTimestamp = time.Now()
defer func() {
queryProofTimer.UpdateSince(startTimestamp)
log.Info("Query propose proof",
"block_id", blockID,
"state_root", stateRoot.String(),
Expand Down
1 change: 0 additions & 1 deletion core/proof_keeper_test.go

This file was deleted.

1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
TrieCommitInterval: config.TrieCommitInterval,
PathNodeBuffer: config.PathNodeBuffer,
ProposeBlockInterval: config.ProposeBlockInterval,
EnableProofKeeper: config.EnableProofKeeper,
}
)
// Override the chain config with provided settings.
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type Config struct {
StateScheme string `toml:",omitempty"`
PathNodeBuffer pathdb.NodeBufferType `toml:",omitempty"` // Type of trienodebuffer to cache trie nodes in disklayer
ProposeBlockInterval uint64 `toml:",omitempty"` // Keep the same with op-proposer propose block interval
EnableProofKeeper bool `toml:",omitempty"` // Whether to enable proof keeper

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
22 changes: 12 additions & 10 deletions trie/triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ type nodebufferlist struct {
stopFlushing atomic.Bool // Flag stops writing disk under background.
stopCh chan struct{}
waitStopCh chan struct{}
//notifyKeepCh chan *KeepRecord
//waitKeepCh chan struct{}
keepFunc KeepRecordWatchFunc
keepFunc KeepRecordWatchFunc
}

// newNodeBufferList initializes the node buffer list with the provided nodes
Expand Down Expand Up @@ -471,7 +469,9 @@ func (nf *nodebufferlist) diffToBase() {
log.Crit("committed block number misaligned", "block", buffer.block)
}

nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks})
if nf.keepFunc != nil {
nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks})
}

nf.baseMux.Lock()
err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes)
Expand Down Expand Up @@ -531,13 +531,15 @@ func (nf *nodebufferlist) loop() {
select {
case <-nf.stopCh:
// force flush to ensure all proposed-block can be kept by proof keeper
nf.mux.RLock()
notifyKeeperFunc := func(buffer *multiDifflayer) bool {
nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks})
return true
if nf.keepFunc != nil {
nf.mux.RLock()
notifyKeeperFunc := func(buffer *multiDifflayer) bool {
nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks})
return true
}
nf.traverseReverse(notifyKeeperFunc)
nf.mux.RUnlock()
}
nf.traverseReverse(notifyKeeperFunc)
nf.mux.RUnlock()
nf.waitStopCh <- struct{}{}
return
case <-mergeTicker.C:
Expand Down

0 comments on commit 5eda600

Please sign in to comment.