Skip to content

Commit

Permalink
feat: checkpoint use shared blockcache and tablecache
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Apr 1, 2024
1 parent 88e9879 commit 8fcb8dd
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 27 deletions.
52 changes: 52 additions & 0 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,55 @@ func (d *Database) NewCheckpoint(destDir string) error {
opts = append(opts, opt)
return d.db.Checkpoint(destDir, opts...)
}

// OpenCheckpointDB returns a wrapped checkpoint pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func OpenCheckpointDB(file string, blockCache *pebble.Cache, tableCache *pebble.TableCache) (*Database, error) {
namespace := "checkpoint"
logger := log.New(namespace, file)
logger.Info("Open checkpoint db", "checkpoint_dir", file)

db := &Database{
fn: file,
log: logger,
quitChan: make(chan chan error),
}
opt := &pebble.Options{
Cache: blockCache,
TableCache: tableCache,
MaxConcurrentCompactions: func() int { return 1 },
ReadOnly: true,
EventListener: &pebble.EventListener{
CompactionBegin: db.onCompactionBegin,
CompactionEnd: db.onCompactionEnd,
WriteStallBegin: db.onWriteStallBegin,
WriteStallEnd: db.onWriteStallEnd,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble
}

// Open the db and recover any potential corruptions
innerDB, err := pebble.Open(file, opt)
if err != nil {
return nil, err
}
db.db = innerDB

db.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
db.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
db.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil)
db.memCompGauge = metrics.NewRegisteredGauge(namespace+"compact/memory", nil)
db.level0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/level0", nil)
db.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
db.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil)

// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval, namespace)
return db, nil
}
74 changes: 47 additions & 27 deletions trie/triedb/pathdb/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -17,9 +17,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/trie/triestate"

pebbledb "github.com/cockroachdb/pebble"
)

const (
Expand All @@ -43,7 +46,7 @@ const (
checkDBOpenedIntervalSecond = 1

// defaultMaxOpenedNumber is max opened number.
defaultMaxOpenedNumber = 3
defaultMaxOpenedNumber = 10
)

// encodeSubCheckpointDir encodes a completed checkpoint directory.
Expand All @@ -70,6 +73,7 @@ func decodeSubCheckpointDir(subCheckpointDir string) (blockNumber uint64, blockR

// checkpointLayer only provides node interface.
type checkpointLayer struct {
cm *checkpointManager
blockNumber uint64
root common.Hash
checkpointDir string
Expand All @@ -88,7 +92,7 @@ type checkpointLayer struct {
var _ layer = &checkpointLayer{}

// newCheckpointLayer is used to return make a checkpoint instance.
func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err error) {
func newCheckpointLayer(cm *checkpointManager, checkpointDir string) (ckptLayer *checkpointLayer, err error) {
var (
blockNumber uint64
blockRoot common.Hash
Expand All @@ -99,6 +103,7 @@ func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err e
return nil, err
}
ckptLayer = &checkpointLayer{
cm: cm,
checkpointDir: checkpointDir,
blockNumber: blockNumber,
root: blockRoot,
Expand All @@ -116,9 +121,11 @@ func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err e
return ckptLayer, nil
}

// loop runs an event loop.
func (c *checkpointLayer) loop() {
var (
err error
kvDB ethdb.KeyValueStore
startOpenCheckpointTimestamp time.Time
endOpenCheckpointTimestamp time.Time
startCloseCheckpointTimestamp time.Time
Expand All @@ -134,49 +141,51 @@ func (c *checkpointLayer) loop() {
select {
case <-tryCloseTicker.C: // close db when a long time unused for reducing memory usage.
if c.isOpened.Load() && time.Now().Sub(c.lastUsedTimestamp) > time.Second*checkDBKeepaliveIntervalSecond {
startCloseCheckpointTimestamp = time.Now()
if err = c.checkpointDB.Close(); err != nil {
log.Warn("Failed to close checkpoint db", "error", err)
} else {
c.isOpened.Store(false)
}
endCloseCheckpointTimestamp = time.Now()
closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))
}
case <-c.tryCloseCh: // close db when total opened db > max for reducing memory usage.
if c.isOpened.Load() {
startCloseCheckpointTimestamp = time.Now()
if err = c.checkpointDB.Close(); err != nil {
log.Warn("Failed to close checkpoint db", "error", err)
} else {
c.isOpened.Store(false)
}
endCloseCheckpointTimestamp = time.Now()
closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))
}
case <-c.tryOpenCh: // lazy to open db for reducing memory usage.
c.lastUsedTimestamp = time.Now()
if !c.isOpened.Load() {
startOpenCheckpointTimestamp = time.Now()
c.checkpointDB, err = rawdb.Open(rawdb.OpenOptions{
Type: "pebble",
Directory: c.checkpointDir,
ReadOnly: true,
})
endOpenCheckpointTimestamp = time.Now()
openCheckpointTimer.Update(endOpenCheckpointTimestamp.Sub(startOpenCheckpointTimestamp))
if err != nil {
log.Warn("Failed to open raw db", "error", err)
} else {
kvDB, err = pebble.OpenCheckpointDB(c.checkpointDir, c.cm.sharedBlockCache, c.cm.sharedTableCache)
if err == nil {
c.checkpointDB = rawdb.NewDatabase(kvDB)
c.isOpened.Store(true)
} else {
log.Warn("Failed to open checkpoint kv db", "error", err)
}

endOpenCheckpointTimestamp = time.Now()
openCheckpointTimer.Update(endOpenCheckpointTimestamp.Sub(startOpenCheckpointTimestamp))
}
c.waitOpenCh <- struct{}{}
case <-c.tryCloseAndDeleteCh: // gc for reducing disk usage.
if c.isOpened.Load() {
startCloseCheckpointTimestamp = time.Now()
err = c.checkpointDB.Close()
endCloseCheckpointTimestamp = time.Now()
if err != nil {
log.Warn("Failed to close checkpoint db", "error", err)
} else {
c.isOpened.Store(false)
}
endCloseCheckpointTimestamp = time.Now()
log.Info("Close checkpoint db", "error", err,
"elapsed", common.PrettyDuration(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)))
closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))
Expand All @@ -191,14 +200,18 @@ func (c *checkpointLayer) loop() {
deleteCheckpointTimer.Update(endDeleteCheckpointTimestamp.Sub(startDeleteCheckpointTimestamp))
}
c.waitCloseAndDeleteCh <- struct{}{}
case <-c.tryCloseAndStopCh: // close for stop
case <-c.tryCloseAndStopCh: // close for stop.
if c.isOpened.Load() {
c.checkpointDB.Close()
startCloseCheckpointTimestamp = time.Now()
err = c.checkpointDB.Close() // ignore this error.
endCloseCheckpointTimestamp = time.Now()
log.Info("Close checkpoint db", "error", err, "checkpoint_dir", c.checkpointDir,
"elapsed", common.PrettyDuration(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)))
closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))
}
close(c.waitCloseAndStopCh)
return
}

}
}

Expand Down Expand Up @@ -295,6 +308,8 @@ type checkpointManager struct {
checkpointMap map[common.Hash]*checkpointLayer
stopCh chan struct{}
waitCloseCh chan struct{}
sharedBlockCache *pebbledb.Cache
sharedTableCache *pebbledb.TableCache
}

// newCheckpointManager returns a checkpoint manager instance.
Expand All @@ -309,6 +324,9 @@ func newCheckpointManager(db ethdb.Database, checkpointDir string, enableCheckPo
cm.stopCh = make(chan struct{})
cm.waitCloseCh = make(chan struct{})
cm.loadCheckpoint()

cm.sharedBlockCache = pebbledb.NewCache(8 * 1024 * 1024) // 8MB
cm.sharedTableCache = pebbledb.NewTableCache(cm.sharedBlockCache, runtime.NumCPU(), 4096)
go cm.loop()
cm.logDetailedInfo()
log.Info("Succeed to new checkpoint manager", "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
Expand Down Expand Up @@ -352,12 +370,14 @@ func (cm *checkpointManager) needDoCheckpoint(currentCommitBlockNumber uint64) b
return false
}

// loop runs an event loop.
func (cm *checkpointManager) loop() {
gcCheckpointTicker := time.NewTicker(time.Second * gcCheckpointIntervalSecond)
defer gcCheckpointTicker.Stop()
printCheckpointStatTicker := time.NewTicker(time.Second * printCheckpointStatIntervalSecond)
defer printCheckpointStatTicker.Stop()
checkMaxDBOpenedTicker := time.NewTicker(time.Second * checkDBOpenedIntervalSecond)
defer checkMaxDBOpenedTicker.Stop()

for {
select {
Expand All @@ -381,22 +401,22 @@ func (cm *checkpointManager) checkMaxDBOpened() {
}

var (
totalOpennedNumber int64
toCloseCkptLayer *checkpointLayer
totalOpenedNumber int64
toCloseCkptLayer *checkpointLayer
)
cm.mux.RLock()
for _, ckptLayer := range cm.checkpointMap {
if ckptLayer.isOpened.Load() {
totalOpennedNumber = totalOpennedNumber + 1
totalOpenedNumber = totalOpenedNumber + 1
if toCloseCkptLayer == nil || ckptLayer.lastUsedTimestamp.Before(toCloseCkptLayer.lastUsedTimestamp) {
toCloseCkptLayer = ckptLayer
}
}
}
cm.mux.RUnlock()

openedCheckpointSizeGauge.Update(totalOpennedNumber)
if totalOpennedNumber > defaultMaxOpenedNumber {
openedCheckpointSizeGauge.Update(totalOpenedNumber)
if totalOpenedNumber > defaultMaxOpenedNumber {
toCloseCkptLayer.closeDB()
}
}
Expand Down Expand Up @@ -484,14 +504,14 @@ func (cm *checkpointManager) loadCheckpoint() error {
"error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
}()

dir, err := ioutil.ReadDir(cm.checkpointDir)
entries, err := os.ReadDir(cm.checkpointDir)
if err != nil {
log.Warn("Failed to scan checkpoint dir", "dir", cm.checkpointDir, "error", err)
return err
}
for _, f := range dir {
for _, f := range entries {
if f.IsDir() {
if ckptLayer, err = newCheckpointLayer(cm.checkpointDir + "/" + f.Name()); err != nil {
if ckptLayer, err = newCheckpointLayer(cm, cm.checkpointDir+"/"+f.Name()); err != nil {
log.Warn("Failed to new checkpoint layer", "error", err)
return err
}
Expand Down Expand Up @@ -536,7 +556,7 @@ func (cm *checkpointManager) addCheckpoint(blockNumber uint64, blockRoot common.
return err
}

ckptLayer, err = newCheckpointLayer(subCheckpointDir)
ckptLayer, err = newCheckpointLayer(cm, subCheckpointDir)
if err != nil {
log.Warn("Failed to make checkpoint db", "error", err)
return err
Expand Down

0 comments on commit 8fcb8dd

Please sign in to comment.