Skip to content

Commit

Permalink
feat: add keeper schedule skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Apr 17, 2024
1 parent 0b1f3ed commit 09eee0a
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 74 deletions.
31 changes: 31 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,34 @@ func (d *Decimal) UnmarshalJSON(input []byte) error {
return err
}
}

// ProofList implements ethdb.KeyValueWriter and collects the proofs as
// hex-strings for delivery to rpc-caller.
type ProofList []string

func (n *ProofList) Put(key []byte, value []byte) error {
*n = append(*n, hexutil.Encode(value))
return nil
}

func (n *ProofList) Delete(key []byte) error {
panic("not supported")
}

// AccountResult is the result of a GetProof operation.
type AccountResult struct {
Address Address `json:"address"`
AccountProof []string `json:"accountProof"`
Balance *big.Int `json:"balance"`
CodeHash Hash `json:"codeHash"`
Nonce uint64 `json:"nonce"`
StorageHash Hash `json:"storageHash"`
StorageProof []StorageResult `json:"storageProof"`
}

// StorageResult provides a proof for a key-value pair.
type StorageResult struct {
Key string `json:"key"`
Value *big.Int `json:"value"`
Proof []string `json:"proof"`
}
62 changes: 42 additions & 20 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
Expand Down Expand Up @@ -83,13 +84,13 @@ var (
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil)
stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil)
blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil)
stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil)
triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)

blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -160,6 +161,7 @@ type CacheConfig struct {
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height.
RpcClient *rpc.Client
}

// triedbConfig derives the configures for trie database.
Expand All @@ -177,6 +179,8 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
ProposeBlockInterval: c.ProposeBlockInterval,
KeepCh: make(chan *pathdb.KeepRecord),
WaitKeepCh: make(chan struct{}),
}
}
return config
Expand Down Expand Up @@ -278,6 +282,8 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config

proofKeeper *proofKeeper
}

// NewBlockChain returns a fully initialised block chain using information
Expand All @@ -287,8 +293,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if cacheConfig == nil {
cacheConfig = defaultCacheConfig
}

// Open trie database with provided config
triedb := trie.NewDatabase(db, cacheConfig.triedbConfig())
trieConfig := cacheConfig.triedbConfig()
triedb := trie.NewDatabase(db, trieConfig)

// Setup the genesis block, commit the provided genesis specification
// to database if the genesis block is not present yet, or load the
Expand Down Expand Up @@ -337,6 +345,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
if cacheConfig.StateScheme == rawdb.PathScheme && trieConfig.PathDB.TrieNodeBufferType == pathdb.NodeBufferList {
opts := &proofKeeperOptions{
enable: true,
keepInterval: trieConfig.PathDB.ProposeBlockInterval,
watchStartKeepCh: trieConfig.PathDB.KeepCh,
notifyFinishKeepCh: trieConfig.PathDB.WaitKeepCh,
rpcClient: cacheConfig.RpcClient,
}
bc.proofKeeper, err = newProofKeeper(db, opts)
if err != nil {
return nil, err
}
}

bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1460,7 +1482,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
blockWriteExternalTimer.UpdateSince(start)
log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())

// Commit all cached state changes into underlying memory database.
start = time.Now()
root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()))
Expand All @@ -1477,10 +1499,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we're running an archive node, always flush
start = time.Now()
defer func () {
defer func() {
triedbCommitExternalTimer.UpdateSince(start)
log.Debug("triedbCommitExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())
} ()
}()
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false)
}
Expand Down Expand Up @@ -1785,7 +1807,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
}()

defer func () {
defer func() {
DebugInnerExecutionDuration = 0
}()
for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() {
Expand Down Expand Up @@ -1908,16 +1930,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
proctime := time.Since(start) // processing + validation

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing)
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing)
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation

innerExecutionTimer.Update(DebugInnerExecutionDuration)

Expand Down Expand Up @@ -1959,7 +1981,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
blockGasUsedGauge.Update(int64(block.GasUsed())/1000000)
blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
105 changes: 105 additions & 0 deletions core/proof_keeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package core

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
)

const (
l2ToL1MessagePasser = "0x4200000000000000000000000000000000000016"
)

var (
l2ToL1MessagePasserAddr = common.HexToAddress(l2ToL1MessagePasser)
)

type proofDataRecord struct {
Address common.Address `json:"address"`
AccountProof []string `json:"accountProof"`
Balance *big.Int `json:"balance"`
CodeHash common.Hash `json:"codeHash"`
Nonce uint64 `json:"nonce"`
StorageHash common.Hash `json:"storageHash"`
}

type proofKeeperOptions struct {
enable bool
keepInterval uint64
watchStartKeepCh chan *pathdb.KeepRecord
notifyFinishKeepCh chan struct{}
rpcClient *rpc.Client
}

type proofKeeper struct {
opts *proofKeeperOptions
keeperMetaDB ethdb.Database
proofDataDB *rawdb.ResettableFreezer
selfClient *ethclient.Client
}

func newProofKeeper(keeperMetaDB ethdb.Database, opts *proofKeeperOptions) (*proofKeeper, error) {
var (
err error
ancientDir string
keeper *proofKeeper
)

if ancientDir, err = keeperMetaDB.AncientDatadir(); err != nil {
log.Error("Failed to get ancient data dir", "error", err)
return nil, err
}
keeper = &proofKeeper{
opts: opts,
keeperMetaDB: keeperMetaDB,
selfClient: ethclient.NewClient(opts.rpcClient),
}
if keeper.proofDataDB, err = rawdb.NewProofFreezer(ancientDir, false); err != nil {
log.Error("Failed to new proof ancient freezer", "error", err)
return nil, err
}

go keeper.eventLoop()

log.Info("Succeed to init proof keeper", "options", opts)
return keeper, nil
}

func (keeper *proofKeeper) queryProposedProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) {
rawPoof, err := keeper.selfClient.GetProof(context.Background(), l2ToL1MessagePasserAddr, nil, hexutil.EncodeUint64(kRecord.BlockID))
// rawPoof, err := keeper.selfClient.GetProof(context.Background(), l2ToL1MessagePasserAddr, nil, strconv.FormatUint(kRecord.BlockID, 10))

if err != nil {
log.Error("Failed to get proof", "block_id", kRecord.BlockID, "hex_block_id", hexutil.EncodeUint64(kRecord.BlockID), "error", err)
return nil, err
}
pRecord := &proofDataRecord{
Address: rawPoof.Address,
AccountProof: rawPoof.AccountProof,
Balance: rawPoof.Balance,
CodeHash: rawPoof.CodeHash,
Nonce: rawPoof.Nonce,
StorageHash: rawPoof.StorageHash,
}
log.Info("Succeed to get proof", "proof_record", pRecord)
return pRecord, nil
}

func (keeper *proofKeeper) eventLoop() {
for {
select {
case r := <-keeper.opts.watchStartKeepCh:
log.Info("keep proof", "record", r)
keeper.queryProposedProof(r)
keeper.opts.notifyFinishKeepCh <- struct{}{}
}
}
}
1 change: 1 addition & 0 deletions core/proof_keeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package core
14 changes: 14 additions & 0 deletions core/rawdb/ancient_scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,19 @@ var stateFreezerNoSnappy = map[string]bool{
stateHistoryStorageData: false,
}

const (
proposeProofTable = "propose_proof"
)

var proofFreezerNoSnappy = map[string]bool{
proposeProofTable: true,
}

// The list of identifiers of ancient stores.
var (
chainFreezerName = "chain" // the folder name of chain segment ancient store.
stateFreezerName = "state" // the folder name of reverse diff ancient store.
proofFreezerName = "proof" // the folder name of propose withdraw proof store
)

// freezers the collections of all builtin freezers.
Expand All @@ -79,3 +88,8 @@ var freezers = []string{chainFreezerName, stateFreezerName}
func NewStateFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) {
return NewResettableFreezer(filepath.Join(ancientDir, stateFreezerName), "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy)
}

// NewProofFreezer initializes the freezer for propose withdraw proof.
func NewProofFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) {
return NewResettableFreezer(filepath.Join(ancientDir, proofFreezerName), "eth/db/proof", readOnly, stateHistoryTableSize, proofFreezerNoSnappy)
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
TrieCommitInterval: config.TrieCommitInterval,
PathNodeBuffer: config.PathNodeBuffer,
ProposeBlockInterval: config.ProposeBlockInterval,
RpcClient: stack.Attach(),
}
)
// Override the chain config with provided settings.
Expand Down
21 changes: 21 additions & 0 deletions ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,24 @@ func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress {
HealingBytecode: uint64(p.HealingBytecode),
}
}

// todo:
func (ec *Client) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*common.AccountResult, error) {
var getProofResponse *common.AccountResult
err := ec.c.CallContext(ctx, &getProofResponse, "eth_getProof", address, storage, blockTag)
if err != nil {
return nil, err
}
if getProofResponse == nil {
return nil, ethereum.NotFound
}
if len(getProofResponse.StorageProof) != len(storage) {
return nil, fmt.Errorf("missing storage proof data, got %d proof entries but requested %d storage keys", len(getProofResponse.StorageProof), len(storage))
}
for i, key := range storage {
if key.String() != getProofResponse.StorageProof[i].Key {
return nil, fmt.Errorf("unexpected storage proof key difference for entry %d: got %s but requested %s", i, getProofResponse.StorageProof[i].Key, key)
}
}
return getProofResponse, nil
}
Loading

0 comments on commit 09eee0a

Please sign in to comment.