diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1c78bf8f4c..8ef8d8bc92 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -315,7 +315,7 @@ var ( EnableCheckpointFlag = &cli.BoolFlag{ Name: "pathdb.enablecheckpoint", Usage: "Enable trie db checkpoint for get withdraw-contract proof", - Value: true, + Value: false, Category: flags.StateCategory, } MaxCheckpointNumberFlag = &cli.Uint64Flag{ diff --git a/trie/triedb/pathdb/checkpoint_manager.go b/trie/triedb/pathdb/checkpoint_manager.go index 5468e6ffa0..607f9f3858 100644 --- a/trie/triedb/pathdb/checkpoint_manager.go +++ b/trie/triedb/pathdb/checkpoint_manager.go @@ -23,13 +23,17 @@ import ( ) const ( - // DefaultMaxCheckpointNumber is used to keep checkpoint number, default is 7 days for main-net. - DefaultMaxCheckpointNumber = 24 * 7 + // DefaultMaxCheckpointNumber is used to keep checkpoint number, default is 3 days for main-net. + DefaultMaxCheckpointNumber = 24 * 3 // DefaultCheckpointDir is used to store checkpoint directory. DefaultCheckpointDir = "checkpoint" - - gcCheckpointIntervalSecond = 1 + // gcCheckpointIntervalSecond is used to control gc loop interval. + gcCheckpointIntervalSecond = 1 + // printCheckpointStatIntervalSecond is used to print checkpoint stat. printCheckpointStatIntervalSecond = 60 + // checkpointDBKeepaliveSecond is used to check db which has been not used for a long time, + // and close it for reducing memory. + checkpointDBKeepaliveSecond = 60 ) func encodeSubCheckpointDir(checkpointDir string, blockNumber uint64, blockRoot common.Hash) string { @@ -49,40 +53,148 @@ func decodeSubCheckpointDir(subCheckpointDir string) (blockNumber uint64, blockR return blockNumber, blockRoot, nil } -func makeCheckpointDB(checkpointDir string) (ckptLayer *checkpointLayer, err error) { +// checkpointLayer only provides node interface. +type checkpointLayer struct { + blockNumber uint64 + root common.Hash + checkpointDir string + checkpointDB ethdb.Database + isOpened atomic.Bool // lazy to open db when it is used. + lastUsedTimestamp time.Time // db will be closed if it is not used for a long time. + tryOpenCh chan struct{} + waitOpenCh chan struct{} + tryCloseAndDeleteCh chan struct{} + waitCloseAndDeleteCh chan struct{} + tryCloseAndStopCh chan struct{} + waitCloseAndStopCh chan struct{} +} + +var _ layer = &checkpointLayer{} + +func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err error) { var ( blockNumber uint64 blockRoot common.Hash - ckptDB ethdb.Database ) if blockNumber, blockRoot, err = decodeSubCheckpointDir(checkpointDir); err != nil { log.Warn("Failed to decode checkpoint dir", "dir", checkpointDir, "err", err) return nil, err } - if ckptDB, err = rawdb.Open(rawdb.OpenOptions{ - Type: "pebble", - Directory: checkpointDir, - ReadOnly: true, - }); err != nil { - log.Warn("Failed to open raw db", "err", err) - return nil, err - } ckptLayer = &checkpointLayer{ - blockNumber: blockNumber, - root: blockRoot, - checkpointDB: ckptDB, + checkpointDir: checkpointDir, + blockNumber: blockNumber, + root: blockRoot, + lastUsedTimestamp: time.Now(), } + ckptLayer.isOpened.Store(false) + ckptLayer.tryOpenCh = make(chan struct{}) + ckptLayer.waitOpenCh = make(chan struct{}) + ckptLayer.tryCloseAndDeleteCh = make(chan struct{}) + ckptLayer.waitCloseAndDeleteCh = make(chan struct{}) + ckptLayer.tryCloseAndStopCh = make(chan struct{}) + ckptLayer.waitCloseAndStopCh = make(chan struct{}) + go ckptLayer.loop() return ckptLayer, nil } -// only provide node interface -type checkpointLayer struct { - blockNumber uint64 - root common.Hash - checkpointDB ethdb.Database +func (c *checkpointLayer) loop() { + var ( + err error + startOpenCheckpointTimestamp time.Time + endOpenCheckpointTimestamp time.Time + startCloseCheckpointTimestamp time.Time + endCloseCheckpointTimestamp time.Time + startDeleteCheckpointTimestamp time.Time + endDeleteCheckpointTimestamp time.Time + ) + + tryCloseTicker := time.NewTicker(time.Second * checkpointDBKeepaliveSecond) + + for { + select { + case <-tryCloseTicker.C: // close db when a long time unused for reducing memory usage. + if c.isOpened.Load() && time.Now().Sub(c.lastUsedTimestamp) > time.Second*checkpointDBKeepaliveSecond { + if err = c.checkpointDB.Close(); err != nil { + log.Warn("Failed to close checkpoint db", "err", err) + } else { + c.isOpened.Store(false) + } + } + case <-c.tryOpenCh: // lazy to open db for reducing memory usage. + c.lastUsedTimestamp = time.Now() + if !c.isOpened.Load() { + startOpenCheckpointTimestamp = time.Now() + c.checkpointDB, err = rawdb.Open(rawdb.OpenOptions{ + Type: "pebble", + Directory: c.checkpointDir, + ReadOnly: true, + }) + endOpenCheckpointTimestamp = time.Now() + openCheckpointTimer.Update(endOpenCheckpointTimestamp.Sub(startOpenCheckpointTimestamp)) + if err != nil { + log.Warn("Failed to open raw db", "err", err) + } else { + c.isOpened.Store(true) + } + + } + c.waitOpenCh <- struct{}{} + case <-c.tryCloseAndDeleteCh: // gc for reducing disk usage. + if c.isOpened.Load() { + startCloseCheckpointTimestamp = time.Now() + err = c.checkpointDB.Close() + endCloseCheckpointTimestamp = time.Now() + if err != nil { + log.Warn("Failed to close checkpoint db", "err", err) + } else { + c.isOpened.Store(false) + } + log.Info("Close checkpoint db", "err", err, + "elapsed", common.PrettyDuration(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))) + closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)) + } + if !c.isOpened.Load() { + startDeleteCheckpointTimestamp = time.Now() + err = os.RemoveAll(c.checkpointDir) + endDeleteCheckpointTimestamp = time.Now() + log.Info("Delete checkpoint db", "err", err, + "elapsed", common.PrettyDuration(endDeleteCheckpointTimestamp.Sub(startDeleteCheckpointTimestamp))) + deleteCheckpointTimer.Update(endDeleteCheckpointTimestamp.Sub(startDeleteCheckpointTimestamp)) + } + c.waitCloseAndDeleteCh <- struct{}{} + case <-c.tryCloseAndStopCh: // close for stop + if c.isOpened.Load() { + c.checkpointDB.Close() + } + close(c.waitCloseAndStopCh) + return + } + + } } +// waitDBOpen opens the db when the db is not open. +// Note that this is a best-effort operation and the db may still fail to open. +func (c *checkpointLayer) waitDBOpen() { + c.tryOpenCh <- struct{}{} + <-c.waitOpenCh +} + +// waitDBCloseAndDelete closes the db and deletes the checkpoint directory when GC checkpoint. +// Note that this is a best-effort operation and the db may still fail to close and delete. +func (c *checkpointLayer) waitDBCloseAndDelete() { + c.tryCloseAndDeleteCh <- struct{}{} + <-c.waitCloseAndDeleteCh +} + +// waitDBCloseAndStop closes the db and finish the background loop. +func (c *checkpointLayer) waitDBCloseAndStop() { + c.tryCloseAndStopCh <- struct{}{} + <-c.waitCloseAndStopCh +} + +// Node implements the layer node interface. func (c *checkpointLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) { var ( startTimestamp time.Time @@ -95,6 +207,11 @@ func (c *checkpointLayer) Node(owner common.Hash, path []byte, hash common.Hash) queryCheckpointTimer.UpdateSince(startTimestamp) }() + c.waitDBOpen() + if !c.isOpened.Load() { + return nil, errors.New("checkpoint db is not opened") + } + if owner == (common.Hash{}) { nBlob, nHash = rawdb.ReadAccountTrieNode(c.checkpointDB, path) } else { @@ -146,6 +263,9 @@ type checkpointManager struct { func newCheckpointManager(db ethdb.Database, checkpointDir string, enableCheckPoint bool, checkpointBlockInterval uint64, maxCheckpointNumber uint64) *checkpointManager { startTimestamp := time.Now() + if checkpointBlockInterval == 0 { + checkpointBlockInterval = DefaultMaxCheckpointNumber + } cm := &checkpointManager{db: db, checkpointDir: checkpointDir, checkpointBlockInterval: checkpointBlockInterval, maxCheckpointNumber: maxCheckpointNumber} cm.enableCheckPoint.Store(enableCheckPoint) cm.checkpointMap = make(map[common.Hash]*checkpointLayer) @@ -176,7 +296,7 @@ func (cm *checkpointManager) close() { cm.mux.Lock() defer cm.mux.Unlock() for _, ckptLayer := range cm.checkpointMap { - ckptLayer.checkpointDB.Close() + ckptLayer.waitDBCloseAndStop() } cm.checkpointMap = make(map[common.Hash]*checkpointLayer) log.Info("Succeed to close checkpoint manager", "elapsed", common.PrettyDuration(time.Since(startTimestamp))) @@ -209,12 +329,12 @@ func (cm *checkpointManager) loop() { } } -func (cm *checkpointManager) gcCheckpoint() error { +func (cm *checkpointManager) gcCheckpoint() { if !cm.enableCheckPoint.Load() { - return nil + return } if len(cm.checkpointMap) <= int(cm.maxCheckpointNumber) { - return nil + return } var ( @@ -241,24 +361,16 @@ func (cm *checkpointManager) gcCheckpoint() error { startTimestamp = time.Now() defer func() { gcCheckpointTimer.UpdateSince(startTimestamp) - log.Info("GC a checkpoint", "checkpoint_block_number", gcCkptLayer.blockNumber, - "gc_checkpoint_dir", gcCheckpointDir, - "elapsed", common.PrettyDuration(time.Since(startTimestamp)), - "err", err) + log.Info("GC one checkpoint", "checkpoint_block_number", gcCkptLayer.blockNumber, + "gc_checkpoint_dir", gcCheckpointDir, "err", err, + "elapsed", common.PrettyDuration(time.Since(startTimestamp))) }() cm.mux.Lock() delete(cm.checkpointMap, gcCkptLayer.rootHash()) cm.mux.Unlock() - - if err = gcCkptLayer.checkpointDB.Close(); err != nil { - log.Warn("Failed to close checkpoint db", "err", err) - return err - } - gcCheckpointDir = encodeSubCheckpointDir(cm.checkpointDir, gcCkptLayer.blockNumber, gcCkptLayer.root) - err = os.RemoveAll(gcCheckpointDir) + gcCkptLayer.waitDBCloseAndDelete() } - return nil } func (cm *checkpointManager) printCheckpointStat() error { @@ -270,7 +382,11 @@ func (cm *checkpointManager) printCheckpointStat() error { cm.mux.RLock() checkpointSizeGauge.Update(int64(len(cm.checkpointMap))) for _, ckptLayer := range cm.checkpointMap { - log.Info("Checkpoint detailed info", "checkpoint_block_number", ckptLayer.blockNumber, "checkpoint_block_root", ckptLayer.root.String()) + log.Info("Checkpoint detailed info", + "checkpoint_block_number", ckptLayer.blockNumber, + "checkpoint_block_root", ckptLayer.root.String(), + "checkpoint_is_opened", ckptLayer.isOpened.Load(), + "last_used_timestamp", ckptLayer.lastUsedTimestamp.String()) } cm.mux.RUnlock() return nil @@ -292,7 +408,7 @@ func (cm *checkpointManager) loadCheckpoint() error { defer func() { addCheckpointTimer.UpdateSince(startTimestamp) log.Info("Load checkpoint", "load_checkpoint_number", succeedLoadNumber, - "elapsed", common.PrettyDuration(time.Since(startTimestamp)), "err", err) + "err", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) }() dir, err := ioutil.ReadDir(cm.checkpointDir) @@ -302,8 +418,8 @@ func (cm *checkpointManager) loadCheckpoint() error { } for _, f := range dir { if f.IsDir() { - if ckptLayer, err = makeCheckpointDB(cm.checkpointDir + "/" + f.Name()); err != nil { - log.Warn("Failed to make checkpoint", "err", err) + if ckptLayer, err = newCheckpointLayer(cm.checkpointDir + "/" + f.Name()); err != nil { + log.Warn("Failed to new checkpoint layer", "err", err) return err } cm.checkpointMap[ckptLayer.rootHash()] = ckptLayer @@ -320,26 +436,34 @@ func (cm *checkpointManager) addCheckpoint(blockNumber uint64, blockRoot common. } var ( - err error - startTimestamp time.Time - subCheckpointDir string - ckptLayer *checkpointLayer + err error + startTimestamp time.Time + startNewCheckpointTimestamp time.Time + endNewCheckpointTimestamp time.Time + subCheckpointDir string + ckptLayer *checkpointLayer ) startTimestamp = time.Now() defer func() { addCheckpointTimer.UpdateSince(startTimestamp) - log.Info("Add checkpoint", "block_number", blockNumber, "block_root", blockRoot.String(), - "elapsed", common.PrettyDuration(time.Since(startTimestamp)), "err", err) + newCheckpointTimer.Update(endNewCheckpointTimestamp.Sub(startNewCheckpointTimestamp)) + log.Info("Add a new checkpoint", "block_number", blockNumber, "block_root", blockRoot.String(), + "err", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)), + "new_elapsed", common.PrettyDuration(endNewCheckpointTimestamp.Sub(startNewCheckpointTimestamp))) }() subCheckpointDir = encodeSubCheckpointDir(cm.checkpointDir, blockNumber, blockRoot) - if err = cm.db.NewCheckpoint(subCheckpointDir); err != nil { + startNewCheckpointTimestamp = time.Now() + err = cm.db.NewCheckpoint(subCheckpointDir) + endNewCheckpointTimestamp = time.Now() + if err != nil { log.Warn("Failed to create checkpoint", "err", err) return err } - if ckptLayer, err = makeCheckpointDB(subCheckpointDir); err != nil { + ckptLayer, err = newCheckpointLayer(subCheckpointDir) + if err != nil { log.Warn("Failed to make checkpoint db", "err", err) return err } @@ -364,7 +488,7 @@ func (cm *checkpointManager) getCheckpointLayer(root common.Hash) (layer, error) startTimestamp = time.Now() defer func() { getCheckpointTimer.UpdateSince(startTimestamp) - log.Info("Get checkpoint", "root", root.String(), + log.Info("Get the checkpoint", "root", root.String(), "elapsed", common.PrettyDuration(time.Since(startTimestamp)), "err", err) }() diff --git a/trie/triedb/pathdb/metrics.go b/trie/triedb/pathdb/metrics.go index f37ad4de2d..c260a82976 100644 --- a/trie/triedb/pathdb/metrics.go +++ b/trie/triedb/pathdb/metrics.go @@ -64,9 +64,13 @@ var ( proposedBlockReaderLessDifflayer = metrics.NewRegisteredMeter("pathdb/nodebufferlist/proposedblockreader/lessdifflayer", nil) // checkpoint metrics - checkpointSizeGauge = metrics.NewRegisteredGauge("pathdb/checkpoint/size", nil) - addCheckpointTimer = metrics.NewRegisteredTimer("pathdb/addcheckpoint/time", nil) - getCheckpointTimer = metrics.NewRegisteredTimer("pathdb/getcheckpoint/time", nil) - gcCheckpointTimer = metrics.NewRegisteredTimer("pathdb/gccheckpoint/time", nil) - queryCheckpointTimer = metrics.NewRegisteredTimer("pathdb/querycheckpoint/time", nil) + checkpointSizeGauge = metrics.NewRegisteredGauge("pathdb/checkpoint/size", nil) + addCheckpointTimer = metrics.NewRegisteredTimer("pathdb/addcheckpoint/time", nil) + newCheckpointTimer = metrics.NewRegisteredTimer("pathdb/newcheckpoint/time", nil) + openCheckpointTimer = metrics.NewRegisteredTimer("pathdb/opencheckpoint/time", nil) + getCheckpointTimer = metrics.NewRegisteredTimer("pathdb/getcheckpoint/time", nil) + gcCheckpointTimer = metrics.NewRegisteredTimer("pathdb/gccheckpoint/time", nil) + closeCheckpointTimer = metrics.NewRegisteredTimer("pathdb/closecheckpoint/time", nil) + deleteCheckpointTimer = metrics.NewRegisteredTimer("pathdb/deletecheckpoint/time", nil) + queryCheckpointTimer = metrics.NewRegisteredTimer("pathdb/querycheckpoint/time", nil) ) diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index 4cb03d12c3..9643a29577 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -258,6 +258,7 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, _ = nf.popBack() if nf.checkpointManager.needDoCheckpoint(buffer.block) { + // todo: check persistID := nf.persistID + nf.base.layers err := nf.base.flush(nf.db, nf.clean, persistID) if err != nil { @@ -586,7 +587,7 @@ func (nf *nodebufferlist) proposedBlockReader(blockRoot common.Hash) (layer, err if err != nil { context = append(context, []interface{}{"err", err}...) log.Error("proposed block state is not available", context...) - return nil, fmt.Errorf("proposed block proof state %#x is not available", blockRoot) + return nil, fmt.Errorf("proposed block proof state %#x is not available", blockRoot.String()) } return ckptLayer, nil }