diff --git a/core/blockchain.go b/core/blockchain.go index 37242e3eb4..1459afa575 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -348,7 +348,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if cacheConfig.StateScheme == rawdb.PathScheme && trieConfig.PathDB.TrieNodeBufferType == pathdb.NodeBufferList { opts := &proofKeeperOptions{ enable: true, - keepInterval: trieConfig.PathDB.ProposeBlockInterval, watchStartKeepCh: trieConfig.PathDB.KeepCh, notifyFinishKeepCh: trieConfig.PathDB.WaitKeepCh, rpcClient: cacheConfig.RpcClient, diff --git a/core/proof_keeper.go b/core/proof_keeper.go index 2b671254fa..c6b4966f29 100644 --- a/core/proof_keeper.go +++ b/core/proof_keeper.go @@ -2,9 +2,11 @@ package core import ( "context" + "encoding/binary" "encoding/json" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -27,33 +29,40 @@ var ( // keeperMetaRecord is used to ensure proof continuous in scenarios such as enable/disable keeper, interval changes, reorg, etc. // which is stored in kv db. type keeperMetaRecord struct { - blockID uint64 - proofID uint64 - proposedInterval uint64 + BlockID uint64 `json:"blockID"` + ProofID uint64 `json:"proofID"` + KeepInterval uint64 `json:"keepInterval"` } // proofDataRecord is used to store proposed proof data. // which is stored in ancient db. type proofDataRecord struct { - ProofID uint64 `json:"proofID"` - BlockID uint64 `json:"blockID"` - StateRoot common.Hash `json:"stateRoot"` - Address common.Address `json:"address"` - AccountProof []string `json:"accountProof"` - Balance *big.Int `json:"balance"` - CodeHash common.Hash `json:"codeHash"` - Nonce uint64 `json:"nonce"` - StorageHash common.Hash `json:"storageHash"` + ProofID uint64 `json:"proofID"` + BlockID uint64 `json:"blockID"` + StateRoot common.Hash `json:"stateRoot"` + + Address common.Address `json:"address"` + AccountProof []string `json:"accountProof"` + Balance *big.Int `json:"balance"` + CodeHash common.Hash `json:"codeHash"` + Nonce uint64 `json:"nonce"` + StorageHash common.Hash `json:"storageHash"` + StorageProof []common.StorageResult `json:"storageProof"` } +// todo: move metadb to opts. type proofKeeperOptions struct { enable bool - keepInterval uint64 watchStartKeepCh chan *pathdb.KeepRecord notifyFinishKeepCh chan struct{} rpcClient *rpc.Client } +// todo: ensure ancient sync write?? +// add metris +// add ut +// polish log +// todo gc type ProofKeeper struct { opts *proofKeeperOptions keeperMetaDB ethdb.Database @@ -95,6 +104,7 @@ func newProofKeeper(keeperMetaDB ethdb.Database, opts *proofKeeperOptions) (*Pro } func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) { + // todo: maybe need retry rawPoof, err := keeper.selfClient.GetProof(context.Background(), l2ToL1MessagePasserAddr, nil, big.NewInt(int64(kRecord.BlockID))) if err != nil { @@ -114,6 +124,7 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData CodeHash: rawPoof.CodeHash, Nonce: rawPoof.Nonce, StorageHash: rawPoof.StorageHash, + StorageProof: rawPoof.StorageProof, } log.Info("Succeed to get proof", "proof_record", pRecord) return pRecord, nil @@ -127,11 +138,13 @@ func (keeper *ProofKeeper) eventLoop() { for { select { case keepRecord := <-keeper.opts.watchStartKeepCh: - log.Info("keep proof", "record", keepRecord) + // log.Info("keep proof", "record", keepRecord) var ( hasTruncatedMeta bool curProofID uint64 + startTimestamp time.Time ) + startTimestamp = time.Now() hasTruncatedMeta = keeper.truncateKeeperMetaRecordHeadIfNeeded(keepRecord.BlockID) metaList := keeper.getKeeperMetaRecordList() if len(metaList) == 0 { @@ -152,14 +165,18 @@ func (keeper *ProofKeeper) eventLoop() { if hasTruncatedMeta || !putKeeperMetaRecordOnce { putKeeperMetaRecordOnce = true keeper.putKeeperMetaRecord(&keeperMetaRecord{ - proposedInterval: keeper.opts.keepInterval, - blockID: keepRecord.BlockID, - proofID: curProofID, + BlockID: keepRecord.BlockID, + ProofID: curProofID, + KeepInterval: keepRecord.KeepInterval, }) } proofRecord.ProofID = curProofID keeper.putProofDataRecord(proofRecord) } + log.Info("Keep a new proof", + "block_id", keepRecord.BlockID, + "state_root", keepRecord.StateRoot.String(), + "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) keeper.opts.notifyFinishKeepCh <- struct{}{} case queryBlockID := <-keeper.queryProofCh: var resultProofRecord *proofDataRecord @@ -169,11 +186,11 @@ func (keeper *ProofKeeper) eventLoop() { index := len(metaList) - 1 for index >= 0 { m := metaList[index] - if queryBlockID >= m.blockID { - if queryBlockID%m.proposedInterval != 0 { // check + if queryBlockID >= m.BlockID { + if m.KeepInterval == 0 || queryBlockID%m.KeepInterval != 0 { // check break } - proofID = m.proofID + (queryBlockID-m.blockID)/m.proposedInterval + proofID = m.ProofID + (queryBlockID-m.BlockID)/m.KeepInterval resultProofRecord = keeper.getProofDataRecord(proofID) break } @@ -187,9 +204,9 @@ func (keeper *ProofKeeper) eventLoop() { // inner util func list // keeper meta func -func (keeper *ProofKeeper) getKeeperMetaRecordList() []keeperMetaRecord { +func (keeper *ProofKeeper) getKeeperMetaRecordList() []*keeperMetaRecord { var ( - metaList []keeperMetaRecord + metaList []*keeperMetaRecord err error iter ethdb.Iterator ) @@ -197,11 +214,18 @@ func (keeper *ProofKeeper) getKeeperMetaRecordList() []keeperMetaRecord { defer iter.Release() for iter.Next() { + keyBlockID := binary.BigEndian.Uint64(iter.Key()[1:]) m := keeperMetaRecord{} if err = json.Unmarshal(iter.Value(), &m); err != nil { + log.Error("Failed to unmarshal keeper meta record", "key_block_id", keyBlockID, "error", err) continue } - metaList = append(metaList, m) + if keyBlockID != m.BlockID { + log.Error("Failed to check consistency between key and value", "key_block_id", keyBlockID, "value_block_id", m.BlockID) + continue + } + log.Info("Keep meta", "key_block_id", keyBlockID, "meta_record", m) + metaList = append(metaList, &m) } log.Info("Succeed to get meta list", "list", metaList) return metaList @@ -224,9 +248,9 @@ func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64) if err = json.Unmarshal(iter.Value(), &m); err != nil { continue } - if m.blockID >= blockID { + if m.BlockID >= blockID { hasTruncated = true - rawdb.DeleteKeeperMeta(batch, m.blockID) + rawdb.DeleteKeeperMeta(batch, m.BlockID) } } @@ -243,8 +267,8 @@ func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) { if err != nil { log.Crit("Failed to marshal keeper meta record", "err", err) } - rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.blockID, meta) - log.Info("Succeed to add keeper meta", "record", m) + rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.BlockID, meta) + log.Info("Succeed to put keeper meta", "record", m) } @@ -314,7 +338,7 @@ func (keeper *ProofKeeper) putProofDataRecord(p *proofDataRecord) { log.Crit("Failed to marshal proof data", "err", err) } rawdb.PutProofData(keeper.proofDataDB, p.ProofID, proof) - log.Info("Succeed to add proof data", "record", p) + log.Info("Succeed to put proof data", "record", p) } // IsProposeProofQuery is used to determine whether it is proposed proof. @@ -335,27 +359,45 @@ func (keeper *ProofKeeper) IsProposeProofQuery(address common.Address, storageKe // QueryProposeProof is used to get proof which is stored in ancient proof. func (keeper *ProofKeeper) QueryProposeProof(blockID uint64, stateRoot common.Hash) (*common.AccountResult, error) { + var ( + result *common.AccountResult + err error + startTimestamp time.Time + ) + startTimestamp = time.Now() + defer func() { + log.Info("Query propose proof", + "block_id", blockID, + "state_root", stateRoot.String(), + "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() keeper.queryProofCh <- blockID resultProofRecord := <-keeper.waitQueryProofCh if resultProofRecord == nil { // Maybe the keeper was disabled for a certain period of time before. - return nil, fmt.Errorf("proof is not found") + err = fmt.Errorf("proof is not found, block_id=%d", blockID) + return nil, err } if resultProofRecord.BlockID != blockID { - // Maybe the keeper was disabled for a certain period of time before. - return nil, fmt.Errorf("proof is not found due to block is mismatch") + // Maybe expected_block_id proof is not kept due to disabled or some bug + err = fmt.Errorf("proof is not found due to block is mismatch, expected_block_id=%d, actual_block_id=%d", + blockID, resultProofRecord.BlockID) + return nil, err } if resultProofRecord.StateRoot.Cmp(stateRoot) != 0 { // Impossible, unless there is a bug. - return nil, fmt.Errorf("proof is not found due to state root is mismatch") + err = fmt.Errorf("proof is not found due to state root is mismatch, expected_state_root=%s, actual_state_root=%s", + stateRoot.String(), resultProofRecord.StateRoot.String()) + return nil, err } - return &common.AccountResult{ + result = &common.AccountResult{ Address: resultProofRecord.Address, AccountProof: resultProofRecord.AccountProof, Balance: resultProofRecord.Balance, CodeHash: resultProofRecord.CodeHash, Nonce: resultProofRecord.Nonce, StorageHash: resultProofRecord.StorageHash, - StorageProof: nil, - }, nil + StorageProof: resultProofRecord.StorageProof, + } + return result, nil } diff --git a/core/rawdb/accessors_proof.go b/core/rawdb/accessors_proof.go index bbe47f63a8..fb97d3fc1f 100644 --- a/core/rawdb/accessors_proof.go +++ b/core/rawdb/accessors_proof.go @@ -9,6 +9,9 @@ const ( blockNumberLength = 8 // uint64 is 8bytes ) +// todo: cannot panic +// todo: more tips +// todo: inspect proof ancient // Keeper Meta func IterateKeeperMeta(db ethdb.Iteratee) ethdb.Iterator { return NewKeyLengthIterator(db.NewIterator(proofKeeperMetaPrefix, nil), len(proofKeeperMetaPrefix)+blockNumberLength) diff --git a/eth/api_backend.go b/eth/api_backend.go index 53da78b607..b34601c581 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -214,7 +214,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B } stateDb, err := b.eth.BlockChain().StateAt(header.Root) if err != nil { - return nil, nil, err + return nil, header, err } return stateDb, header, nil } @@ -236,7 +236,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN } stateDb, err := b.eth.BlockChain().StateAt(header.Root) if err != nil { - return nil, nil, err + return nil, header, err } return stateDb, header, nil } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index aae32ef4b7..448c6ec45e 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -677,12 +677,8 @@ type StorageResult struct { } // GetProof returns the Merkle-proof for a given account and optionally some storage keys. -func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (*AccountResult, error) { - var ( - err error - header *types.Header - ) - header, err = headerByNumberOrHash(ctx, s.b, blockNrOrHash) +func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (result *AccountResult, err error) { + header, err := headerByNumberOrHash(ctx, s.b, blockNrOrHash) if err != nil { return nil, err } @@ -700,9 +696,20 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st } defer func() { - if proofKeeper := s.b.GetProofKeeper(); err != nil && proofKeeper != nil { + if proofKeeper := s.b.GetProofKeeper(); err != nil && proofKeeper != nil && header.Number != nil { if proofKeeper.IsProposeProofQuery(address, storageKeys, header.Number.Uint64()) { - // todo + if innerResult, innerError := proofKeeper.QueryProposeProof(header.Number.Uint64(), header.Root); innerError == nil { + result = &AccountResult{ + Address: innerResult.Address, + AccountProof: innerResult.AccountProof, + Balance: (*hexutil.Big)(innerResult.Balance), + CodeHash: innerResult.CodeHash, + Nonce: hexutil.Uint64(innerResult.Nonce), + StorageHash: innerResult.StorageHash, + StorageProof: make([]StorageResult, 0), + } + err = nil + } } } }() diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index 29050c0318..1638bd287a 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -30,8 +30,9 @@ const ( ) type KeepRecord struct { - BlockID uint64 - StateRoot common.Hash + BlockID uint64 + StateRoot common.Hash + KeepInterval uint64 } var _ trienodebuffer = &nodebufferlist{} @@ -66,6 +67,7 @@ type nodebufferlist struct { isFlushing atomic.Bool // Flag indicates writing disk under background. stopFlushing atomic.Bool // Flag stops writing disk under background. stopCh chan struct{} + waitStopCh chan struct{} notifyKeepCh chan *KeepRecord waitKeepCh chan struct{} } @@ -119,6 +121,7 @@ func newNodeBufferList( count: 1, persistID: rawdb.ReadPersistentStateID(db), stopCh: make(chan struct{}), + waitStopCh: make(chan struct{}), notifyKeepCh: keepCh, waitKeepCh: waitKeepCh, } @@ -246,8 +249,10 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, notifyKeeperFunc := func(buffer *multiDifflayer) bool { if buffer.block%nf.wpBlocks == 0 { - nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root} + keepRecord := &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks} + nf.notifyKeepCh <- keepRecord <-nf.waitKeepCh + log.Info("Succeed to keep proof in force flush", "record", keepRecord) } return true } @@ -362,6 +367,7 @@ func (nf *nodebufferlist) getLayers() uint64 { // waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. func (nf *nodebufferlist) waitAndStopFlushing() { close(nf.stopCh) + <-nf.waitStopCh nf.stopFlushing.Store(true) for nf.isFlushing.Load() { time.Sleep(time.Second) @@ -483,7 +489,7 @@ func (nf *nodebufferlist) diffToBase() { } if buffer.block%nf.wpBlocks == 0 && nf.notifyKeepCh != nil { // maybe keep proof - nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root} + nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks} <-nf.waitKeepCh } @@ -544,6 +550,8 @@ func (nf *nodebufferlist) loop() { for { select { case <-nf.stopCh: + nf.flush(nil, nil, 0, true) // force flush to ensure all proposed-block can be kept by proof keeper + nf.waitStopCh <- struct{}{} return case <-mergeTicker.C: if nf.stopFlushing.Load() {