From 8fcb8dd98a011f9f72230280f85bed95c44acca3 Mon Sep 17 00:00:00 2001 From: "will@2012" Date: Mon, 1 Apr 2024 16:37:43 +0800 Subject: [PATCH] feat: checkpoint use shared blockcache and tablecache --- ethdb/pebble/pebble.go | 52 +++++++++++++++++ trie/triedb/pathdb/checkpoint_manager.go | 74 +++++++++++++++--------- 2 files changed, 99 insertions(+), 27 deletions(-) diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index ec0fd31e79..c069e39357 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -674,3 +674,55 @@ func (d *Database) NewCheckpoint(destDir string) error { opts = append(opts, opt) return d.db.Checkpoint(destDir, opts...) } + +// OpenCheckpointDB returns a wrapped checkpoint pebble DB object. The namespace is the prefix that the +// metrics reporting should use for surfacing internal stats. +func OpenCheckpointDB(file string, blockCache *pebble.Cache, tableCache *pebble.TableCache) (*Database, error) { + namespace := "checkpoint" + logger := log.New(namespace, file) + logger.Info("Open checkpoint db", "checkpoint_dir", file) + + db := &Database{ + fn: file, + log: logger, + quitChan: make(chan chan error), + } + opt := &pebble.Options{ + Cache: blockCache, + TableCache: tableCache, + MaxConcurrentCompactions: func() int { return 1 }, + ReadOnly: true, + EventListener: &pebble.EventListener{ + CompactionBegin: db.onCompactionBegin, + CompactionEnd: db.onCompactionEnd, + WriteStallBegin: db.onWriteStallBegin, + WriteStallEnd: db.onWriteStallEnd, + }, + Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble + } + + // Open the db and recover any potential corruptions + innerDB, err := pebble.Open(file, opt) + if err != nil { + return nil, err + } + db.db = innerDB + + db.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil) + db.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil) + db.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil) + db.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil) + db.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil) + db.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil) + db.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil) + db.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil) + db.memCompGauge = metrics.NewRegisteredGauge(namespace+"compact/memory", nil) + db.level0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/level0", nil) + db.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil) + db.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil) + db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil) + + // Start up the metrics gathering and return + go db.meter(metricsGatheringInterval, namespace) + return db, nil +} diff --git a/trie/triedb/pathdb/checkpoint_manager.go b/trie/triedb/pathdb/checkpoint_manager.go index fbae6f8ed0..ee7276a369 100644 --- a/trie/triedb/pathdb/checkpoint_manager.go +++ b/trie/triedb/pathdb/checkpoint_manager.go @@ -4,10 +4,10 @@ import ( "errors" "fmt" "io" - "io/ioutil" "math" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -17,9 +17,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/pebble" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" + + pebbledb "github.com/cockroachdb/pebble" ) const ( @@ -43,7 +46,7 @@ const ( checkDBOpenedIntervalSecond = 1 // defaultMaxOpenedNumber is max opened number. - defaultMaxOpenedNumber = 3 + defaultMaxOpenedNumber = 10 ) // encodeSubCheckpointDir encodes a completed checkpoint directory. @@ -70,6 +73,7 @@ func decodeSubCheckpointDir(subCheckpointDir string) (blockNumber uint64, blockR // checkpointLayer only provides node interface. type checkpointLayer struct { + cm *checkpointManager blockNumber uint64 root common.Hash checkpointDir string @@ -88,7 +92,7 @@ type checkpointLayer struct { var _ layer = &checkpointLayer{} // newCheckpointLayer is used to return make a checkpoint instance. -func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err error) { +func newCheckpointLayer(cm *checkpointManager, checkpointDir string) (ckptLayer *checkpointLayer, err error) { var ( blockNumber uint64 blockRoot common.Hash @@ -99,6 +103,7 @@ func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err e return nil, err } ckptLayer = &checkpointLayer{ + cm: cm, checkpointDir: checkpointDir, blockNumber: blockNumber, root: blockRoot, @@ -116,9 +121,11 @@ func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err e return ckptLayer, nil } +// loop runs an event loop. func (c *checkpointLayer) loop() { var ( err error + kvDB ethdb.KeyValueStore startOpenCheckpointTimestamp time.Time endOpenCheckpointTimestamp time.Time startCloseCheckpointTimestamp time.Time @@ -134,49 +141,51 @@ func (c *checkpointLayer) loop() { select { case <-tryCloseTicker.C: // close db when a long time unused for reducing memory usage. if c.isOpened.Load() && time.Now().Sub(c.lastUsedTimestamp) > time.Second*checkDBKeepaliveIntervalSecond { + startCloseCheckpointTimestamp = time.Now() if err = c.checkpointDB.Close(); err != nil { log.Warn("Failed to close checkpoint db", "error", err) } else { c.isOpened.Store(false) } + endCloseCheckpointTimestamp = time.Now() + closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)) } case <-c.tryCloseCh: // close db when total opened db > max for reducing memory usage. if c.isOpened.Load() { + startCloseCheckpointTimestamp = time.Now() if err = c.checkpointDB.Close(); err != nil { log.Warn("Failed to close checkpoint db", "error", err) } else { c.isOpened.Store(false) } + endCloseCheckpointTimestamp = time.Now() + closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)) } case <-c.tryOpenCh: // lazy to open db for reducing memory usage. c.lastUsedTimestamp = time.Now() if !c.isOpened.Load() { startOpenCheckpointTimestamp = time.Now() - c.checkpointDB, err = rawdb.Open(rawdb.OpenOptions{ - Type: "pebble", - Directory: c.checkpointDir, - ReadOnly: true, - }) - endOpenCheckpointTimestamp = time.Now() - openCheckpointTimer.Update(endOpenCheckpointTimestamp.Sub(startOpenCheckpointTimestamp)) - if err != nil { - log.Warn("Failed to open raw db", "error", err) - } else { + kvDB, err = pebble.OpenCheckpointDB(c.checkpointDir, c.cm.sharedBlockCache, c.cm.sharedTableCache) + if err == nil { + c.checkpointDB = rawdb.NewDatabase(kvDB) c.isOpened.Store(true) + } else { + log.Warn("Failed to open checkpoint kv db", "error", err) } - + endOpenCheckpointTimestamp = time.Now() + openCheckpointTimer.Update(endOpenCheckpointTimestamp.Sub(startOpenCheckpointTimestamp)) } c.waitOpenCh <- struct{}{} case <-c.tryCloseAndDeleteCh: // gc for reducing disk usage. if c.isOpened.Load() { startCloseCheckpointTimestamp = time.Now() err = c.checkpointDB.Close() - endCloseCheckpointTimestamp = time.Now() if err != nil { log.Warn("Failed to close checkpoint db", "error", err) } else { c.isOpened.Store(false) } + endCloseCheckpointTimestamp = time.Now() log.Info("Close checkpoint db", "error", err, "elapsed", common.PrettyDuration(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))) closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)) @@ -191,14 +200,18 @@ func (c *checkpointLayer) loop() { deleteCheckpointTimer.Update(endDeleteCheckpointTimestamp.Sub(startDeleteCheckpointTimestamp)) } c.waitCloseAndDeleteCh <- struct{}{} - case <-c.tryCloseAndStopCh: // close for stop + case <-c.tryCloseAndStopCh: // close for stop. if c.isOpened.Load() { - c.checkpointDB.Close() + startCloseCheckpointTimestamp = time.Now() + err = c.checkpointDB.Close() // ignore this error. + endCloseCheckpointTimestamp = time.Now() + log.Info("Close checkpoint db", "error", err, "checkpoint_dir", c.checkpointDir, + "elapsed", common.PrettyDuration(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))) + closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)) } close(c.waitCloseAndStopCh) return } - } } @@ -295,6 +308,8 @@ type checkpointManager struct { checkpointMap map[common.Hash]*checkpointLayer stopCh chan struct{} waitCloseCh chan struct{} + sharedBlockCache *pebbledb.Cache + sharedTableCache *pebbledb.TableCache } // newCheckpointManager returns a checkpoint manager instance. @@ -309,6 +324,9 @@ func newCheckpointManager(db ethdb.Database, checkpointDir string, enableCheckPo cm.stopCh = make(chan struct{}) cm.waitCloseCh = make(chan struct{}) cm.loadCheckpoint() + + cm.sharedBlockCache = pebbledb.NewCache(8 * 1024 * 1024) // 8MB + cm.sharedTableCache = pebbledb.NewTableCache(cm.sharedBlockCache, runtime.NumCPU(), 4096) go cm.loop() cm.logDetailedInfo() log.Info("Succeed to new checkpoint manager", "elapsed", common.PrettyDuration(time.Since(startTimestamp))) @@ -352,12 +370,14 @@ func (cm *checkpointManager) needDoCheckpoint(currentCommitBlockNumber uint64) b return false } +// loop runs an event loop. func (cm *checkpointManager) loop() { gcCheckpointTicker := time.NewTicker(time.Second * gcCheckpointIntervalSecond) defer gcCheckpointTicker.Stop() printCheckpointStatTicker := time.NewTicker(time.Second * printCheckpointStatIntervalSecond) defer printCheckpointStatTicker.Stop() checkMaxDBOpenedTicker := time.NewTicker(time.Second * checkDBOpenedIntervalSecond) + defer checkMaxDBOpenedTicker.Stop() for { select { @@ -381,13 +401,13 @@ func (cm *checkpointManager) checkMaxDBOpened() { } var ( - totalOpennedNumber int64 - toCloseCkptLayer *checkpointLayer + totalOpenedNumber int64 + toCloseCkptLayer *checkpointLayer ) cm.mux.RLock() for _, ckptLayer := range cm.checkpointMap { if ckptLayer.isOpened.Load() { - totalOpennedNumber = totalOpennedNumber + 1 + totalOpenedNumber = totalOpenedNumber + 1 if toCloseCkptLayer == nil || ckptLayer.lastUsedTimestamp.Before(toCloseCkptLayer.lastUsedTimestamp) { toCloseCkptLayer = ckptLayer } @@ -395,8 +415,8 @@ func (cm *checkpointManager) checkMaxDBOpened() { } cm.mux.RUnlock() - openedCheckpointSizeGauge.Update(totalOpennedNumber) - if totalOpennedNumber > defaultMaxOpenedNumber { + openedCheckpointSizeGauge.Update(totalOpenedNumber) + if totalOpenedNumber > defaultMaxOpenedNumber { toCloseCkptLayer.closeDB() } } @@ -484,14 +504,14 @@ func (cm *checkpointManager) loadCheckpoint() error { "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) }() - dir, err := ioutil.ReadDir(cm.checkpointDir) + entries, err := os.ReadDir(cm.checkpointDir) if err != nil { log.Warn("Failed to scan checkpoint dir", "dir", cm.checkpointDir, "error", err) return err } - for _, f := range dir { + for _, f := range entries { if f.IsDir() { - if ckptLayer, err = newCheckpointLayer(cm.checkpointDir + "/" + f.Name()); err != nil { + if ckptLayer, err = newCheckpointLayer(cm, cm.checkpointDir+"/"+f.Name()); err != nil { log.Warn("Failed to new checkpoint layer", "error", err) return err } @@ -536,7 +556,7 @@ func (cm *checkpointManager) addCheckpoint(blockNumber uint64, blockRoot common. return err } - ckptLayer, err = newCheckpointLayer(subCheckpointDir) + ckptLayer, err = newCheckpointLayer(cm, subCheckpointDir) if err != nil { log.Warn("Failed to make checkpoint db", "error", err) return err