From 09eee0a4ed4029cf1f9afdc265d2d82c61fbfbd4 Mon Sep 17 00:00:00 2001 From: "will@2012" Date: Wed, 17 Apr 2024 18:16:34 +0800 Subject: [PATCH] feat: add keeper schedule skeleton --- common/types.go | 31 ++++++++ core/blockchain.go | 62 +++++++++++----- core/proof_keeper.go | 105 +++++++++++++++++++++++++++ core/proof_keeper_test.go | 1 + core/rawdb/ancient_scheme.go | 14 ++++ eth/backend.go | 1 + ethclient/ethclient.go | 21 ++++++ ethclient/gethclient/gethclient.go | 26 +------ internal/ethapi/api.go | 17 +---- trie/triedb/pathdb/database.go | 4 +- trie/triedb/pathdb/disklayer.go | 12 ++- trie/triedb/pathdb/journal.go | 4 +- trie/triedb/pathdb/nodebufferlist.go | 56 +++++++++++--- 13 files changed, 280 insertions(+), 74 deletions(-) create mode 100644 core/proof_keeper.go create mode 100644 core/proof_keeper_test.go diff --git a/common/types.go b/common/types.go index aadca87f82..8dd7f3ff77 100644 --- a/common/types.go +++ b/common/types.go @@ -475,3 +475,34 @@ func (d *Decimal) UnmarshalJSON(input []byte) error { return err } } + +// ProofList implements ethdb.KeyValueWriter and collects the proofs as +// hex-strings for delivery to rpc-caller. +type ProofList []string + +func (n *ProofList) Put(key []byte, value []byte) error { + *n = append(*n, hexutil.Encode(value)) + return nil +} + +func (n *ProofList) Delete(key []byte) error { + panic("not supported") +} + +// AccountResult is the result of a GetProof operation. +type AccountResult struct { + Address Address `json:"address"` + AccountProof []string `json:"accountProof"` + Balance *big.Int `json:"balance"` + CodeHash Hash `json:"codeHash"` + Nonce uint64 `json:"nonce"` + StorageHash Hash `json:"storageHash"` + StorageProof []StorageResult `json:"storageProof"` +} + +// StorageResult provides a proof for a key-value pair. +type StorageResult struct { + Key string `json:"key"` + Value *big.Int `json:"value"` + Proof []string `json:"proof"` +} diff --git a/core/blockchain.go b/core/blockchain.go index 1dd7490ee3..08e8f1a2b0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -47,6 +47,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/triedb/hashdb" "github.com/ethereum/go-ethereum/trie/triedb/pathdb" @@ -83,13 +84,13 @@ var ( blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) - blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil) - stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil) + blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil) + stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil) triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil) - innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil) + innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil) blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil) - mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil) + mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil) blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) @@ -160,6 +161,7 @@ type CacheConfig struct { SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height. + RpcClient *rpc.Client } // triedbConfig derives the configures for trie database. @@ -177,6 +179,8 @@ func (c *CacheConfig) triedbConfig() *trie.Config { CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024, ProposeBlockInterval: c.ProposeBlockInterval, + KeepCh: make(chan *pathdb.KeepRecord), + WaitKeepCh: make(chan struct{}), } } return config @@ -278,6 +282,8 @@ type BlockChain struct { processor Processor // Block transaction processor interface forker *ForkChoice vmConfig vm.Config + + proofKeeper *proofKeeper } // NewBlockChain returns a fully initialised block chain using information @@ -287,8 +293,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if cacheConfig == nil { cacheConfig = defaultCacheConfig } + // Open trie database with provided config - triedb := trie.NewDatabase(db, cacheConfig.triedbConfig()) + trieConfig := cacheConfig.triedbConfig() + triedb := trie.NewDatabase(db, trieConfig) // Setup the genesis block, commit the provided genesis specification // to database if the genesis block is not present yet, or load the @@ -337,6 +345,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error + if cacheConfig.StateScheme == rawdb.PathScheme && trieConfig.PathDB.TrieNodeBufferType == pathdb.NodeBufferList { + opts := &proofKeeperOptions{ + enable: true, + keepInterval: trieConfig.PathDB.ProposeBlockInterval, + watchStartKeepCh: trieConfig.PathDB.KeepCh, + notifyFinishKeepCh: trieConfig.PathDB.WaitKeepCh, + rpcClient: cacheConfig.RpcClient, + } + bc.proofKeeper, err = newProofKeeper(db, opts) + if err != nil { + return nil, err + } + } + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { return nil, err @@ -1460,7 +1482,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } blockWriteExternalTimer.UpdateSince(start) log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash()) - + // Commit all cached state changes into underlying memory database. start = time.Now() root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number())) @@ -1477,10 +1499,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } // If we're running an archive node, always flush start = time.Now() - defer func () { + defer func() { triedbCommitExternalTimer.UpdateSince(start) log.Debug("triedbCommitExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash()) - } () + }() if bc.cacheConfig.TrieDirtyDisabled { return bc.triedb.Commit(root, false) } @@ -1785,7 +1807,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } }() - defer func () { + defer func() { DebugInnerExecutionDuration = 0 }() for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() { @@ -1908,16 +1930,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) proctime := time.Since(start) // processing + validation // Update the metrics touched during block processing and validation - accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) - storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) - snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing) - snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing) - accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) - storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) - accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) - storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) - blockExecutionTimer.Update(ptime) // The time spent on block execution - blockValidationTimer.Update(vtime) // The time spent on block validation + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) + snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing) + snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing) + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) + storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) + blockExecutionTimer.Update(ptime) // The time spent on block execution + blockValidationTimer.Update(vtime) // The time spent on block validation innerExecutionTimer.Update(DebugInnerExecutionDuration) @@ -1959,7 +1981,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size() stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead) - blockGasUsedGauge.Update(int64(block.GasUsed())/1000000) + blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/proof_keeper.go b/core/proof_keeper.go new file mode 100644 index 0000000000..ae0743903e --- /dev/null +++ b/core/proof_keeper.go @@ -0,0 +1,105 @@ +package core + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie/triedb/pathdb" +) + +const ( + l2ToL1MessagePasser = "0x4200000000000000000000000000000000000016" +) + +var ( + l2ToL1MessagePasserAddr = common.HexToAddress(l2ToL1MessagePasser) +) + +type proofDataRecord struct { + Address common.Address `json:"address"` + AccountProof []string `json:"accountProof"` + Balance *big.Int `json:"balance"` + CodeHash common.Hash `json:"codeHash"` + Nonce uint64 `json:"nonce"` + StorageHash common.Hash `json:"storageHash"` +} + +type proofKeeperOptions struct { + enable bool + keepInterval uint64 + watchStartKeepCh chan *pathdb.KeepRecord + notifyFinishKeepCh chan struct{} + rpcClient *rpc.Client +} + +type proofKeeper struct { + opts *proofKeeperOptions + keeperMetaDB ethdb.Database + proofDataDB *rawdb.ResettableFreezer + selfClient *ethclient.Client +} + +func newProofKeeper(keeperMetaDB ethdb.Database, opts *proofKeeperOptions) (*proofKeeper, error) { + var ( + err error + ancientDir string + keeper *proofKeeper + ) + + if ancientDir, err = keeperMetaDB.AncientDatadir(); err != nil { + log.Error("Failed to get ancient data dir", "error", err) + return nil, err + } + keeper = &proofKeeper{ + opts: opts, + keeperMetaDB: keeperMetaDB, + selfClient: ethclient.NewClient(opts.rpcClient), + } + if keeper.proofDataDB, err = rawdb.NewProofFreezer(ancientDir, false); err != nil { + log.Error("Failed to new proof ancient freezer", "error", err) + return nil, err + } + + go keeper.eventLoop() + + log.Info("Succeed to init proof keeper", "options", opts) + return keeper, nil +} + +func (keeper *proofKeeper) queryProposedProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) { + rawPoof, err := keeper.selfClient.GetProof(context.Background(), l2ToL1MessagePasserAddr, nil, hexutil.EncodeUint64(kRecord.BlockID)) + // rawPoof, err := keeper.selfClient.GetProof(context.Background(), l2ToL1MessagePasserAddr, nil, strconv.FormatUint(kRecord.BlockID, 10)) + + if err != nil { + log.Error("Failed to get proof", "block_id", kRecord.BlockID, "hex_block_id", hexutil.EncodeUint64(kRecord.BlockID), "error", err) + return nil, err + } + pRecord := &proofDataRecord{ + Address: rawPoof.Address, + AccountProof: rawPoof.AccountProof, + Balance: rawPoof.Balance, + CodeHash: rawPoof.CodeHash, + Nonce: rawPoof.Nonce, + StorageHash: rawPoof.StorageHash, + } + log.Info("Succeed to get proof", "proof_record", pRecord) + return pRecord, nil +} + +func (keeper *proofKeeper) eventLoop() { + for { + select { + case r := <-keeper.opts.watchStartKeepCh: + log.Info("keep proof", "record", r) + keeper.queryProposedProof(r) + keeper.opts.notifyFinishKeepCh <- struct{}{} + } + } +} diff --git a/core/proof_keeper_test.go b/core/proof_keeper_test.go new file mode 100644 index 0000000000..9a8bc9592b --- /dev/null +++ b/core/proof_keeper_test.go @@ -0,0 +1 @@ +package core diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index 6f409fff1d..736667a22f 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -66,10 +66,19 @@ var stateFreezerNoSnappy = map[string]bool{ stateHistoryStorageData: false, } +const ( + proposeProofTable = "propose_proof" +) + +var proofFreezerNoSnappy = map[string]bool{ + proposeProofTable: true, +} + // The list of identifiers of ancient stores. var ( chainFreezerName = "chain" // the folder name of chain segment ancient store. stateFreezerName = "state" // the folder name of reverse diff ancient store. + proofFreezerName = "proof" // the folder name of propose withdraw proof store ) // freezers the collections of all builtin freezers. @@ -79,3 +88,8 @@ var freezers = []string{chainFreezerName, stateFreezerName} func NewStateFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) { return NewResettableFreezer(filepath.Join(ancientDir, stateFreezerName), "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy) } + +// NewProofFreezer initializes the freezer for propose withdraw proof. +func NewProofFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) { + return NewResettableFreezer(filepath.Join(ancientDir, proofFreezerName), "eth/db/proof", readOnly, stateHistoryTableSize, proofFreezerNoSnappy) +} diff --git a/eth/backend.go b/eth/backend.go index 765b96caa5..99166c01d3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -209,6 +209,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieCommitInterval: config.TrieCommitInterval, PathNodeBuffer: config.PathNodeBuffer, ProposeBlockInterval: config.ProposeBlockInterval, + RpcClient: stack.Attach(), } ) // Override the chain config with provided settings. diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index af373b9938..e96f565c2b 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -680,3 +680,24 @@ func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress { HealingBytecode: uint64(p.HealingBytecode), } } + +// todo: +func (ec *Client) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*common.AccountResult, error) { + var getProofResponse *common.AccountResult + err := ec.c.CallContext(ctx, &getProofResponse, "eth_getProof", address, storage, blockTag) + if err != nil { + return nil, err + } + if getProofResponse == nil { + return nil, ethereum.NotFound + } + if len(getProofResponse.StorageProof) != len(storage) { + return nil, fmt.Errorf("missing storage proof data, got %d proof entries but requested %d storage keys", len(getProofResponse.StorageProof), len(storage)) + } + for i, key := range storage { + if key.String() != getProofResponse.StorageProof[i].Key { + return nil, fmt.Errorf("unexpected storage proof key difference for entry %d: got %s but requested %s", i, getProofResponse.StorageProof[i].Key, key) + } + } + return getProofResponse, nil +} diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index e2c0ef3ed0..5cb703ea5b 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -60,27 +60,9 @@ func (ec *Client) CreateAccessList(ctx context.Context, msg ethereum.CallMsg) (* return result.Accesslist, uint64(result.GasUsed), result.Error, nil } -// AccountResult is the result of a GetProof operation. -type AccountResult struct { - Address common.Address `json:"address"` - AccountProof []string `json:"accountProof"` - Balance *big.Int `json:"balance"` - CodeHash common.Hash `json:"codeHash"` - Nonce uint64 `json:"nonce"` - StorageHash common.Hash `json:"storageHash"` - StorageProof []StorageResult `json:"storageProof"` -} - -// StorageResult provides a proof for a key-value pair. -type StorageResult struct { - Key string `json:"key"` - Value *big.Int `json:"value"` - Proof []string `json:"proof"` -} - // GetProof returns the account and storage values of the specified account including the Merkle-proof. // The block number can be nil, in which case the value is taken from the latest known block. -func (ec *Client) GetProof(ctx context.Context, account common.Address, keys []string, blockNumber *big.Int) (*AccountResult, error) { +func (ec *Client) GetProof(ctx context.Context, account common.Address, keys []string, blockNumber *big.Int) (*common.AccountResult, error) { type storageResult struct { Key string `json:"key"` Value *hexutil.Big `json:"value"` @@ -105,15 +87,15 @@ func (ec *Client) GetProof(ctx context.Context, account common.Address, keys []s var res accountResult err := ec.c.CallContext(ctx, &res, "eth_getProof", account, keys, toBlockNumArg(blockNumber)) // Turn hexutils back to normal datatypes - storageResults := make([]StorageResult, 0, len(res.StorageProof)) + storageResults := make([]common.StorageResult, 0, len(res.StorageProof)) for _, st := range res.StorageProof { - storageResults = append(storageResults, StorageResult{ + storageResults = append(storageResults, common.StorageResult{ Key: st.Key, Value: st.Value.ToInt(), Proof: st.Proof, }) } - result := AccountResult{ + result := common.AccountResult{ Address: res.Address, AccountProof: res.AccountProof, Balance: res.Balance.ToInt(), diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 9a88d71fd9..dd5103d8d5 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -676,19 +676,6 @@ type StorageResult struct { Proof []string `json:"proof"` } -// proofList implements ethdb.KeyValueWriter and collects the proofs as -// hex-strings for delivery to rpc-caller. -type proofList []string - -func (n *proofList) Put(key []byte, value []byte) error { - *n = append(*n, hexutil.Encode(value)) - return nil -} - -func (n *proofList) Delete(key []byte) error { - panic("not supported") -} - // GetProof returns the Merkle-proof for a given account and optionally some storage keys. func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (*AccountResult, error) { header, err := headerByNumberOrHash(ctx, s.b, blockNrOrHash) @@ -753,7 +740,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st storageProof[i] = StorageResult{outputKey, &hexutil.Big{}, []string{}} continue } - var proof proofList + var proof common.ProofList if err := storageTrie.Prove(crypto.Keccak256(key.Bytes()), &proof); err != nil { return nil, err } @@ -766,7 +753,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st if err != nil { return nil, err } - var accountProof proofList + var accountProof common.ProofList if err := tr.Prove(crypto.Keccak256(address.Bytes()), &accountProof); err != nil { return nil, err } diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 1d22189068..854d8763f5 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -100,6 +100,8 @@ type Config struct { DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes ReadOnly bool // Flag whether the database is opened in read only mode. ProposeBlockInterval uint64 // Propose block to L1 block interval. + KeepCh chan *KeepRecord + WaitKeepCh chan struct{} } // sanitize checks the provided user configurations and changes anything that's @@ -318,7 +320,7 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh) dl := newDiskLayer(root, 0, db, nil, nb) nb.setClean(dl.cleans) db.tree.reset(dl) diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index 5804cbcd11..20e601806b 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -113,11 +113,19 @@ func GetNodeBufferType(name string) NodeBufferType { return nodeBufferStringToType[name] } -func NewTrieNodeBuffer(db ethdb.Database, trieNodeBufferType NodeBufferType, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers, proposeBlockInterval uint64) trienodebuffer { +func NewTrieNodeBuffer( + db ethdb.Database, + trieNodeBufferType NodeBufferType, + limit int, + nodes map[common.Hash]map[string]*trienode.Node, + layers, proposeBlockInterval uint64, + keepCh chan *KeepRecord, + waitKeepCh chan struct{}, +) trienodebuffer { log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType]) switch trieNodeBufferType { case NodeBufferList: - return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval) + return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepCh, waitKeepCh) case AsyncNodeBuffer: return newAsyncNodeBuffer(limit, nodes, layers) case SyncNodeBuffer: diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index 98fe99390d..af6d0f60a6 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh) dl := newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nb) nb.setClean(dl.cleans) return dl @@ -173,7 +173,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.KeepCh, db.config.WaitKeepCh) base := newDiskLayer(root, id, db, nil, nb) nb.setClean(base.cleans) return base, nil diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index 2a344003be..99863c6738 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -29,6 +29,12 @@ const ( DefaultReserveMultiDifflayerNumber = 3 ) +type KeepRecord struct { + BlockID uint64 + //StateRoot common.Hash + //Reader layer +} + var _ trienodebuffer = &nodebufferlist{} // nodebufferlist implements the trienodebuffer interface, it is designed to meet @@ -61,6 +67,8 @@ type nodebufferlist struct { isFlushing atomic.Bool // Flag indicates writing disk under background. stopFlushing atomic.Bool // Flag stops writing disk under background. stopCh chan struct{} + notifyKeepCh chan *KeepRecord + waitKeepCh chan struct{} } // newNodeBufferList initializes the node buffer list with the provided nodes @@ -69,7 +77,9 @@ func newNodeBufferList( limit uint64, nodes map[common.Hash]map[string]*trienode.Node, layers uint64, - proposeBlockInterval uint64) *nodebufferlist { + proposeBlockInterval uint64, + keepCh chan *KeepRecord, + waitKeepCh chan struct{}) *nodebufferlist { var ( rsevMdNum uint64 dlInMd uint64 @@ -99,17 +109,19 @@ func newNodeBufferList( base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) nf := &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - head: ele, - tail: ele, - count: 1, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), + db: db, + wpBlocks: wpBlocks, + rsevMdNum: rsevMdNum, + dlInMd: dlInMd, + limit: limit, + base: base, + head: ele, + tail: ele, + count: 1, + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + notifyKeepCh: keepCh, + waitKeepCh: waitKeepCh, } go nf.loop() @@ -230,6 +242,21 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, return nil } + if nf.notifyKeepCh != nil { // maybe keep proof + nf.mux.RLock() + + notifyKeeperFunc := func(buffer *multiDifflayer) bool { + if buffer.block%nf.wpBlocks == 0 { + nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block} + <-nf.waitKeepCh + } + return true + } + nf.traverseReverse(notifyKeeperFunc) + + nf.mux.RUnlock() + } + // hang user read/write and background write nf.mux.Lock() nf.baseMux.Lock() @@ -456,6 +483,11 @@ func (nf *nodebufferlist) diffToBase() { log.Crit("committed block number misaligned", "block", buffer.block) } + if buffer.block%nf.wpBlocks == 0 && nf.notifyKeepCh != nil { // maybe keep proof + nf.notifyKeepCh <- &KeepRecord{BlockID: buffer.block} + <-nf.waitKeepCh + } + nf.baseMux.Lock() err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes) nf.baseMux.Unlock()