Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pebble opt #4

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,12 +1882,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read
trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read
blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
blockExecutionTimer.Update(ptime) // The time spent on EVM processing
blockValidationTimer.Update(vtime) // The time spent on block validation

// Write the block to the chain and get the status.
var (
Expand Down Expand Up @@ -1919,7 +1917,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them

blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockWriteTimer.Update(time.Since(wstart))
blockInsertTimer.UpdateSince(start)

// Report the import stats before returning the various results
Expand Down
7 changes: 7 additions & 0 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
bloomfilter "github.com/holiman/bloomfilter/v2"
)
Expand Down Expand Up @@ -77,6 +78,10 @@ var (
bloomStorageHasherOffset = 0
)

var (
perfGetSnapshotDiffLayerAccountTimer = metrics.NewRegisteredTimer("perf/get/snapshot/diff/layer/account/time", nil)
)

func init() {
// Init the bloom offsets in the range [0:24] (requires 8 bytes)
bloomDestructHasherOffset = rand.Intn(25)
Expand Down Expand Up @@ -273,6 +278,8 @@ func (dl *diffLayer) Stale() bool {
// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
func (dl *diffLayer) Account(hash common.Hash) (*Account, error) {
start := time.Now()
defer perfGetSnapshotDiffLayerAccountTimer.UpdateSince(start)
data, err := dl.AccountRLP(hash)
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ package snapshot
import (
"bytes"
"sync"
"time"

"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

var (
perfGetSnapshotDiskLayerAccountTimer = metrics.NewRegisteredTimer("perf/get/snapshot/disk/layer/account/time", nil)
)

// diskLayer is a low level persistent snapshot built on top of a key-value store.
type diskLayer struct {
diskdb ethdb.KeyValueStore // Key-value store containing the base snapshot
Expand Down Expand Up @@ -66,6 +72,8 @@ func (dl *diskLayer) Stale() bool {
// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
func (dl *diskLayer) Account(hash common.Hash) (*Account, error) {
start := time.Now()
defer perfGetSnapshotDiskLayerAccountTimer.UpdateSince(start)
data, err := dl.AccountRLP(hash)
if err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ import (

const defaultNumOfSlots = 100

var (
perfGetStateObjectTimer = metrics.NewRegisteredTimer("perf/get/state/object/time", nil)
perfIntermediateRootTimer = metrics.NewRegisteredTimer("perf/intermediate/root/time", nil)
perfStateDBCommitTimer = metrics.NewRegisteredTimer("perf/state/db/commit/time", nil)
perfStateDBFinaliseTimer = metrics.NewRegisteredTimer("perf/state/db/finalise/time", nil)
)

type revision struct {
id int
journalIndex int
Expand Down Expand Up @@ -563,6 +570,8 @@ func (s *StateDB) deleteStateObject(obj *stateObject) {
// the object is not found or was deleted in this execution context. If you need
// to differentiate between non-existent/just-deleted, use getDeletedStateObject.
func (s *StateDB) getStateObject(addr common.Address) *stateObject {
start := time.Now()
defer perfGetStateObjectTimer.UpdateSince(start)
if obj := s.getDeletedStateObject(addr); obj != nil && !obj.deleted {
return obj
}
Expand Down Expand Up @@ -866,6 +875,8 @@ func (s *StateDB) GetRefund() uint64 {
// the journal as well as the refunds. Finalise, however, will not push any updates
// into the tries just yet. Only IntermediateRoot or Commit will do that.
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
start := time.Now()
defer perfStateDBFinaliseTimer.UpdateSince(start)
addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties))
for addr := range s.journal.dirties {
obj, exist := s.stateObjects[addr]
Expand Down Expand Up @@ -915,6 +926,8 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
// It is called in between transactions to get the root hash that
// goes into transaction receipts.
func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
start := time.Now()
defer perfIntermediateRootTimer.UpdateSince(start)
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)

Expand Down Expand Up @@ -992,6 +1005,8 @@ func (s *StateDB) clearJournalAndRefund() {

// Commit writes the state to the underlying in-memory trie database.
func (s *StateDB) Commit(deleteEmptyObjects bool, postCommitFuncs ...func() error) (common.Hash, error) {
start := time.Now()
defer perfStateDBCommitTimer.UpdateSince(start)
// Short circuit in case any database failure occurred earlier.
if s.dbErr != nil {
return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
Expand Down
8 changes: 8 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package core
import (
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -27,9 +28,14 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
)

var (
perfProcessTxTime = metrics.NewRegisteredTimer("perf/process/tx/time", nil)
)

// StateProcessor is a basic Processor, which takes care of transitioning
// state from one point to another.
//
Expand Down Expand Up @@ -77,6 +83,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
start := time.Now()
msg, err := TransactionToMessage(tx, types.MakeSigner(p.config, header.Number), header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
Expand All @@ -88,6 +95,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
perfProcessTxTime.UpdateSince(start)
}
// Fail if Shanghai not enabled and len(withdrawals) is non-zero.
withdrawals := block.Withdrawals()
Expand Down
6 changes: 6 additions & 0 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func (db *Database) Has(key []byte) (bool, error) {

// Get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
start := time.Now()
defer ethdb.PerfDBGetTimer.UpdateSince(start)
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
Expand All @@ -198,6 +200,8 @@ func (db *Database) Get(key []byte) ([]byte, error) {

// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
start := time.Now()
defer ethdb.PerfDBPutTimer.UpdateSince(start)
return db.db.Put(key, value, nil)
}

Expand Down Expand Up @@ -497,6 +501,8 @@ func (b *batch) ValueSize() int {

// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
start := time.Now()
defer ethdb.PerfDBBatchWriteTimer.UpdateSince(start)
return b.db.Write(b.b, nil)
}

Expand Down
10 changes: 10 additions & 0 deletions ethdb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ethdb

import "github.com/ethereum/go-ethereum/metrics"

var (
// for perf performance
PerfDBGetTimer = metrics.NewRegisteredTimer("perf/db/get/time", nil)
PerfDBPutTimer = metrics.NewRegisteredTimer("perf/db/put/time", nil)
PerfDBBatchWriteTimer = metrics.NewRegisteredTimer("perf/db/batch/write/time", nil)
)
59 changes: 45 additions & 14 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -115,6 +116,16 @@ func (d *Database) onWriteStallEnd() {
atomic.AddInt64(&d.writeDelayTime, int64(time.Since(d.writeDelayStartTime)))
}

// panicLogger is just a noop logger to disable Pebble's internal logger.
type panicLogger struct{}

func (l panicLogger) Infof(format string, args ...interface{}) {
}

func (l panicLogger) Fatalf(format string, args ...interface{}) {
panic(errors.Errorf("fatal: "+format, args...))
}

// New returns a wrapped pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) {
Expand All @@ -126,7 +137,6 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
handles = minHandles
}
logger := log.New("database", file)
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)

// The max memtable size is limited by the uint32 offsets stored in
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
Expand All @@ -137,9 +147,20 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
// including a frozen memory table and another live one.
memTableLimit := 2
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
if memTableSize > maxMemTableSize {
memTableSize = maxMemTableSize

// The memory table size is currently capped at maxMemTableSize-1 due to a
// known bug in the pebble where maxMemTableSize is not recognized as a
// valid size.
//
// TODO use the maxMemTableSize as the maximum table size once the issue
// in pebble is fixed.
if memTableSize >= maxMemTableSize {
memTableSize = maxMemTableSize - 1
}

logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024),
"handles", handles, "memory table", common.StorageSize(memTableSize))

db := &Database{
fn: file,
log: logger,
Expand All @@ -154,7 +175,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (

// The size of memory table(as well as the write buffer).
// Note, there may have more than two memory tables in the system.
MemTableSize: memTableSize,
MemTableSize: uint64(memTableSize),

// MemTableStopWritesThreshold places a hard limit on the size
// of the existent MemTables(including the frozen one).
Expand All @@ -169,6 +190,13 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
ReadOnly: readonly,
EventListener: &pebble.EventListener{
CompactionBegin: db.onCompactionBegin,
CompactionEnd: db.onCompactionEnd,
WriteStallBegin: db.onWriteStallBegin,
WriteStallEnd: db.onWriteStallEnd,
},
Levels: []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
Expand All @@ -178,14 +206,9 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
},
ReadOnly: readonly,
EventListener: &pebble.EventListener{
CompactionBegin: db.onCompactionBegin,
CompactionEnd: db.onCompactionEnd,
WriteStallBegin: db.onWriteStallBegin,
WriteStallEnd: db.onWriteStallEnd,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble
}

// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1
Expand Down Expand Up @@ -247,6 +270,8 @@ func (d *Database) Has(key []byte) (bool, error) {

// Get retrieves the given key if it's present in the key-value store.
func (d *Database) Get(key []byte) ([]byte, error) {
start := time.Now()
defer ethdb.PerfDBGetTimer.UpdateSince(start)
dat, closer, err := d.db.Get(key)
if err != nil {
return nil, err
Expand All @@ -259,6 +284,8 @@ func (d *Database) Get(key []byte) ([]byte, error) {

// Put inserts the given value into the key-value store.
func (d *Database) Put(key []byte, value []byte) error {
start := time.Now()
defer ethdb.PerfDBPutTimer.UpdateSince(start)
return d.db.Set(key, value, pebble.NoSync)
}

Expand All @@ -279,9 +306,10 @@ func (d *Database) NewBatch() ethdb.Batch {
// It's not supported by pebble, but pebble has better memory allocation strategy
// which turns out a lot faster than leveldb. It's performant enough to construct
// batch object without any pre-allocated space.
func (d *Database) NewBatchWithSize(_ int) ethdb.Batch {
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return &batch{
b: d.db.NewBatch(),
b: d.db.NewBatchWithSize(size),
db: d,
}
}

Expand Down Expand Up @@ -478,6 +506,7 @@ func (d *Database) meter(refresh time.Duration) {
// when Write is called. A batch cannot be used concurrently.
type batch struct {
b *pebble.Batch
db *Database
size int
}

Expand All @@ -502,6 +531,8 @@ func (b *batch) ValueSize() int {

// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
start := time.Now()
defer ethdb.PerfDBBatchWriteTimer.UpdateSince(start)
return b.b.Commit(pebble.NoSync)
}

Expand Down Expand Up @@ -543,7 +574,7 @@ type pebbleIterator struct {
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
iter := d.db.NewIter(&pebble.IterOptions{
iter, _ := d.db.NewIter(&pebble.IterOptions{
LowerBound: append(prefix, start...),
UpperBound: upperBound(prefix),
})
Expand Down
Loading