Skip to content

Commit

Permalink
fix: make keeper querier work
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Apr 18, 2024
1 parent 0e170c5 commit 7d99989
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 50 deletions.
1 change: 0 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if cacheConfig.StateScheme == rawdb.PathScheme && trieConfig.PathDB.TrieNodeBufferType == pathdb.NodeBufferList {
opts := &proofKeeperOptions{
enable: true,
keepInterval: trieConfig.PathDB.ProposeBlockInterval,
watchStartKeepCh: trieConfig.PathDB.KeepCh,
notifyFinishKeepCh: trieConfig.PathDB.WaitKeepCh,
rpcClient: cacheConfig.RpcClient,
Expand Down
112 changes: 77 additions & 35 deletions core/proof_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package core

import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -27,33 +29,40 @@ var (
// keeperMetaRecord is used to ensure proof continuous in scenarios such as enable/disable keeper, interval changes, reorg, etc.
// which is stored in kv db.
type keeperMetaRecord struct {
blockID uint64
proofID uint64
proposedInterval uint64
BlockID uint64 `json:"blockID"`
ProofID uint64 `json:"proofID"`
KeepInterval uint64 `json:"keepInterval"`
}

// proofDataRecord is used to store proposed proof data.
// which is stored in ancient db.
type proofDataRecord struct {
ProofID uint64 `json:"proofID"`
BlockID uint64 `json:"blockID"`
StateRoot common.Hash `json:"stateRoot"`
Address common.Address `json:"address"`
AccountProof []string `json:"accountProof"`
Balance *big.Int `json:"balance"`
CodeHash common.Hash `json:"codeHash"`
Nonce uint64 `json:"nonce"`
StorageHash common.Hash `json:"storageHash"`
ProofID uint64 `json:"proofID"`
BlockID uint64 `json:"blockID"`
StateRoot common.Hash `json:"stateRoot"`

Address common.Address `json:"address"`
AccountProof []string `json:"accountProof"`
Balance *big.Int `json:"balance"`
CodeHash common.Hash `json:"codeHash"`
Nonce uint64 `json:"nonce"`
StorageHash common.Hash `json:"storageHash"`
StorageProof []common.StorageResult `json:"storageProof"`
}

// todo: move metadb to opts.
type proofKeeperOptions struct {
enable bool
keepInterval uint64
watchStartKeepCh chan *pathdb.KeepRecord
notifyFinishKeepCh chan struct{}
rpcClient *rpc.Client
}

// todo: ensure ancient sync write??
// add metris
// add ut
// polish log
// todo gc
type ProofKeeper struct {
opts *proofKeeperOptions
keeperMetaDB ethdb.Database
Expand Down Expand Up @@ -95,6 +104,7 @@ func newProofKeeper(keeperMetaDB ethdb.Database, opts *proofKeeperOptions) (*Pro
}

func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) {
// todo: maybe need retry
rawPoof, err := keeper.selfClient.GetProof(context.Background(), l2ToL1MessagePasserAddr, nil, big.NewInt(int64(kRecord.BlockID)))

if err != nil {
Expand All @@ -114,6 +124,7 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData
CodeHash: rawPoof.CodeHash,
Nonce: rawPoof.Nonce,
StorageHash: rawPoof.StorageHash,
StorageProof: rawPoof.StorageProof,
}
log.Info("Succeed to get proof", "proof_record", pRecord)
return pRecord, nil
Expand All @@ -127,11 +138,13 @@ func (keeper *ProofKeeper) eventLoop() {
for {
select {
case keepRecord := <-keeper.opts.watchStartKeepCh:
log.Info("keep proof", "record", keepRecord)
// log.Info("keep proof", "record", keepRecord)
var (
hasTruncatedMeta bool
curProofID uint64
startTimestamp time.Time
)
startTimestamp = time.Now()
hasTruncatedMeta = keeper.truncateKeeperMetaRecordHeadIfNeeded(keepRecord.BlockID)
metaList := keeper.getKeeperMetaRecordList()
if len(metaList) == 0 {
Expand All @@ -152,14 +165,18 @@ func (keeper *ProofKeeper) eventLoop() {
if hasTruncatedMeta || !putKeeperMetaRecordOnce {
putKeeperMetaRecordOnce = true
keeper.putKeeperMetaRecord(&keeperMetaRecord{
proposedInterval: keeper.opts.keepInterval,
blockID: keepRecord.BlockID,
proofID: curProofID,
BlockID: keepRecord.BlockID,
ProofID: curProofID,
KeepInterval: keepRecord.KeepInterval,
})
}
proofRecord.ProofID = curProofID
keeper.putProofDataRecord(proofRecord)
}
log.Info("Keep a new proof",
"block_id", keepRecord.BlockID,
"state_root", keepRecord.StateRoot.String(),
"error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
keeper.opts.notifyFinishKeepCh <- struct{}{}
case queryBlockID := <-keeper.queryProofCh:
var resultProofRecord *proofDataRecord
Expand All @@ -169,11 +186,11 @@ func (keeper *ProofKeeper) eventLoop() {
index := len(metaList) - 1
for index >= 0 {
m := metaList[index]
if queryBlockID >= m.blockID {
if queryBlockID%m.proposedInterval != 0 { // check
if queryBlockID >= m.BlockID {
if m.KeepInterval == 0 || queryBlockID%m.KeepInterval != 0 { // check
break
}
proofID = m.proofID + (queryBlockID-m.blockID)/m.proposedInterval
proofID = m.ProofID + (queryBlockID-m.BlockID)/m.KeepInterval
resultProofRecord = keeper.getProofDataRecord(proofID)
break
}
Expand All @@ -187,21 +204,28 @@ func (keeper *ProofKeeper) eventLoop() {

// inner util func list
// keeper meta func
func (keeper *ProofKeeper) getKeeperMetaRecordList() []keeperMetaRecord {
func (keeper *ProofKeeper) getKeeperMetaRecordList() []*keeperMetaRecord {
var (
metaList []keeperMetaRecord
metaList []*keeperMetaRecord
err error
iter ethdb.Iterator
)
iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB)
defer iter.Release()

for iter.Next() {
keyBlockID := binary.BigEndian.Uint64(iter.Key()[1:])
m := keeperMetaRecord{}
if err = json.Unmarshal(iter.Value(), &m); err != nil {
log.Error("Failed to unmarshal keeper meta record", "key_block_id", keyBlockID, "error", err)
continue
}
metaList = append(metaList, m)
if keyBlockID != m.BlockID {
log.Error("Failed to check consistency between key and value", "key_block_id", keyBlockID, "value_block_id", m.BlockID)
continue
}
log.Info("Keep meta", "key_block_id", keyBlockID, "meta_record", m)
metaList = append(metaList, &m)
}
log.Info("Succeed to get meta list", "list", metaList)
return metaList
Expand All @@ -224,9 +248,9 @@ func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64)
if err = json.Unmarshal(iter.Value(), &m); err != nil {
continue
}
if m.blockID >= blockID {
if m.BlockID >= blockID {
hasTruncated = true
rawdb.DeleteKeeperMeta(batch, m.blockID)
rawdb.DeleteKeeperMeta(batch, m.BlockID)
}

}
Expand All @@ -243,8 +267,8 @@ func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) {
if err != nil {
log.Crit("Failed to marshal keeper meta record", "err", err)
}
rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.blockID, meta)
log.Info("Succeed to add keeper meta", "record", m)
rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.BlockID, meta)
log.Info("Succeed to put keeper meta", "record", m)

}

Expand Down Expand Up @@ -314,7 +338,7 @@ func (keeper *ProofKeeper) putProofDataRecord(p *proofDataRecord) {
log.Crit("Failed to marshal proof data", "err", err)
}
rawdb.PutProofData(keeper.proofDataDB, p.ProofID, proof)
log.Info("Succeed to add proof data", "record", p)
log.Info("Succeed to put proof data", "record", p)
}

// IsProposeProofQuery is used to determine whether it is proposed proof.
Expand All @@ -335,27 +359,45 @@ func (keeper *ProofKeeper) IsProposeProofQuery(address common.Address, storageKe

// QueryProposeProof is used to get proof which is stored in ancient proof.
func (keeper *ProofKeeper) QueryProposeProof(blockID uint64, stateRoot common.Hash) (*common.AccountResult, error) {
var (
result *common.AccountResult
err error
startTimestamp time.Time
)
startTimestamp = time.Now()
defer func() {
log.Info("Query propose proof",
"block_id", blockID,
"state_root", stateRoot.String(),
"error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
}()
keeper.queryProofCh <- blockID
resultProofRecord := <-keeper.waitQueryProofCh
if resultProofRecord == nil {
// Maybe the keeper was disabled for a certain period of time before.
return nil, fmt.Errorf("proof is not found")
err = fmt.Errorf("proof is not found, block_id=%d", blockID)
return nil, err
}
if resultProofRecord.BlockID != blockID {
// Maybe the keeper was disabled for a certain period of time before.
return nil, fmt.Errorf("proof is not found due to block is mismatch")
// Maybe expected_block_id proof is not kept due to disabled or some bug
err = fmt.Errorf("proof is not found due to block is mismatch, expected_block_id=%d, actual_block_id=%d",
blockID, resultProofRecord.BlockID)
return nil, err
}
if resultProofRecord.StateRoot.Cmp(stateRoot) != 0 {
// Impossible, unless there is a bug.
return nil, fmt.Errorf("proof is not found due to state root is mismatch")
err = fmt.Errorf("proof is not found due to state root is mismatch, expected_state_root=%s, actual_state_root=%s",
stateRoot.String(), resultProofRecord.StateRoot.String())
return nil, err
}
return &common.AccountResult{
result = &common.AccountResult{
Address: resultProofRecord.Address,
AccountProof: resultProofRecord.AccountProof,
Balance: resultProofRecord.Balance,
CodeHash: resultProofRecord.CodeHash,
Nonce: resultProofRecord.Nonce,
StorageHash: resultProofRecord.StorageHash,
StorageProof: nil,
}, nil
StorageProof: resultProofRecord.StorageProof,
}
return result, nil
}
3 changes: 3 additions & 0 deletions core/rawdb/accessors_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (
blockNumberLength = 8 // uint64 is 8bytes
)

// todo: cannot panic
// todo: more tips
// todo: inspect proof ancient
// Keeper Meta
func IterateKeeperMeta(db ethdb.Iteratee) ethdb.Iterator {
return NewKeyLengthIterator(db.NewIterator(proofKeeperMetaPrefix, nil), len(proofKeeperMetaPrefix)+blockNumberLength)
Expand Down
4 changes: 2 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
if err != nil {
return nil, nil, err
return nil, header, err
}
return stateDb, header, nil
}
Expand All @@ -236,7 +236,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
if err != nil {
return nil, nil, err
return nil, header, err
}
return stateDb, header, nil
}
Expand Down
23 changes: 15 additions & 8 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,12 +677,8 @@ type StorageResult struct {
}

// GetProof returns the Merkle-proof for a given account and optionally some storage keys.
func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (*AccountResult, error) {
var (
err error
header *types.Header
)
header, err = headerByNumberOrHash(ctx, s.b, blockNrOrHash)
func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (result *AccountResult, err error) {
header, err := headerByNumberOrHash(ctx, s.b, blockNrOrHash)
if err != nil {
return nil, err
}
Expand All @@ -700,9 +696,20 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st
}

defer func() {
if proofKeeper := s.b.GetProofKeeper(); err != nil && proofKeeper != nil {
if proofKeeper := s.b.GetProofKeeper(); err != nil && proofKeeper != nil && header.Number != nil {
if proofKeeper.IsProposeProofQuery(address, storageKeys, header.Number.Uint64()) {
// todo
if innerResult, innerError := proofKeeper.QueryProposeProof(header.Number.Uint64(), header.Root); innerError == nil {
result = &AccountResult{
Address: innerResult.Address,
AccountProof: innerResult.AccountProof,
Balance: (*hexutil.Big)(innerResult.Balance),
CodeHash: innerResult.CodeHash,
Nonce: hexutil.Uint64(innerResult.Nonce),
StorageHash: innerResult.StorageHash,
StorageProof: make([]StorageResult, 0),
}
err = nil
}
}
}
}()
Expand Down
16 changes: 12 additions & 4 deletions trie/triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ const (
)

type KeepRecord struct {
BlockID uint64
StateRoot common.Hash
BlockID uint64
StateRoot common.Hash
KeepInterval uint64
}

var _ trienodebuffer = &nodebufferlist{}
Expand Down Expand Up @@ -66,6 +67,7 @@ type nodebufferlist struct {
isFlushing atomic.Bool // Flag indicates writing disk under background.
stopFlushing atomic.Bool // Flag stops writing disk under background.
stopCh chan struct{}
waitStopCh chan struct{}
notifyKeepCh chan *KeepRecord
waitKeepCh chan struct{}
}
Expand Down Expand Up @@ -119,6 +121,7 @@ func newNodeBufferList(
count: 1,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
notifyKeepCh: keepCh,
waitKeepCh: waitKeepCh,
}
Expand Down Expand Up @@ -246,8 +249,10 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,

notifyKeeperFunc := func(buffer *multiDifflayer) bool {
if buffer.block%nf.wpBlocks == 0 {
nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root}
keepRecord := &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}
nf.notifyKeepCh <- keepRecord
<-nf.waitKeepCh
log.Info("Succeed to keep proof in force flush", "record", keepRecord)
}
return true
}
Expand Down Expand Up @@ -362,6 +367,7 @@ func (nf *nodebufferlist) getLayers() uint64 {
// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk.
func (nf *nodebufferlist) waitAndStopFlushing() {
close(nf.stopCh)
<-nf.waitStopCh
nf.stopFlushing.Store(true)
for nf.isFlushing.Load() {
time.Sleep(time.Second)
Expand Down Expand Up @@ -483,7 +489,7 @@ func (nf *nodebufferlist) diffToBase() {
}

if buffer.block%nf.wpBlocks == 0 && nf.notifyKeepCh != nil { // maybe keep proof
nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root}
nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}
<-nf.waitKeepCh
}

Expand Down Expand Up @@ -544,6 +550,8 @@ func (nf *nodebufferlist) loop() {
for {
select {
case <-nf.stopCh:
nf.flush(nil, nil, 0, true) // force flush to ensure all proposed-block can be kept by proof keeper
nf.waitStopCh <- struct{}{}
return
case <-mergeTicker.C:
if nf.stopFlushing.Load() {
Expand Down

0 comments on commit 7d99989

Please sign in to comment.