Skip to content

Commit

Permalink
chore: polish checkpoint skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Mar 28, 2024
1 parent 88737d7 commit e8291f4
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ var (
EnableCheckpointFlag = &cli.BoolFlag{
Name: "pathdb.enablecheckpoint",
Usage: "Enable trie db checkpoint for get withdraw-contract proof",
Value: true,
Value: false,
Category: flags.StateCategory,
}
MaxCheckpointNumberFlag = &cli.Uint64Flag{
Expand Down
226 changes: 175 additions & 51 deletions trie/triedb/pathdb/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ import (
)

const (
// DefaultMaxCheckpointNumber is used to keep checkpoint number, default is 7 days for main-net.
DefaultMaxCheckpointNumber = 24 * 7
// DefaultMaxCheckpointNumber is used to keep checkpoint number, default is 3 days for main-net.
DefaultMaxCheckpointNumber = 24 * 3
// DefaultCheckpointDir is used to store checkpoint directory.
DefaultCheckpointDir = "checkpoint"

gcCheckpointIntervalSecond = 1
// gcCheckpointIntervalSecond is used to control gc loop interval.
gcCheckpointIntervalSecond = 1
// printCheckpointStatIntervalSecond is used to print checkpoint stat.
printCheckpointStatIntervalSecond = 60
// checkpointDBKeepaliveSecond is used to check db which has been not used for a long time,
// and close it for reducing memory.
checkpointDBKeepaliveSecond = 60
)

func encodeSubCheckpointDir(checkpointDir string, blockNumber uint64, blockRoot common.Hash) string {
Expand All @@ -49,40 +53,148 @@ func decodeSubCheckpointDir(subCheckpointDir string) (blockNumber uint64, blockR
return blockNumber, blockRoot, nil
}

func makeCheckpointDB(checkpointDir string) (ckptLayer *checkpointLayer, err error) {
// checkpointLayer only provides node interface.
type checkpointLayer struct {
blockNumber uint64
root common.Hash
checkpointDir string
checkpointDB ethdb.Database
isOpened atomic.Bool // lazy to open db when it is used.
lastUsedTimestamp time.Time // db will be closed if it is not used for a long time.
tryOpenCh chan struct{}
waitOpenCh chan struct{}
tryCloseAndDeleteCh chan struct{}
waitCloseAndDeleteCh chan struct{}
tryCloseAndStopCh chan struct{}
waitCloseAndStopCh chan struct{}
}

var _ layer = &checkpointLayer{}

func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err error) {
var (
blockNumber uint64
blockRoot common.Hash
ckptDB ethdb.Database
)

if blockNumber, blockRoot, err = decodeSubCheckpointDir(checkpointDir); err != nil {
log.Warn("Failed to decode checkpoint dir", "dir", checkpointDir, "err", err)
return nil, err
}
if ckptDB, err = rawdb.Open(rawdb.OpenOptions{
Type: "pebble",
Directory: checkpointDir,
ReadOnly: true,
}); err != nil {
log.Warn("Failed to open raw db", "err", err)
return nil, err
}
ckptLayer = &checkpointLayer{
blockNumber: blockNumber,
root: blockRoot,
checkpointDB: ckptDB,
checkpointDir: checkpointDir,
blockNumber: blockNumber,
root: blockRoot,
lastUsedTimestamp: time.Now(),
}
ckptLayer.isOpened.Store(false)
ckptLayer.tryOpenCh = make(chan struct{})
ckptLayer.waitOpenCh = make(chan struct{})
ckptLayer.tryCloseAndDeleteCh = make(chan struct{})
ckptLayer.waitCloseAndDeleteCh = make(chan struct{})
ckptLayer.tryCloseAndStopCh = make(chan struct{})
ckptLayer.waitCloseAndStopCh = make(chan struct{})
go ckptLayer.loop()
return ckptLayer, nil
}

// only provide node interface
type checkpointLayer struct {
blockNumber uint64
root common.Hash
checkpointDB ethdb.Database
func (c *checkpointLayer) loop() {
var (
err error
startOpenCheckpointTimestamp time.Time
endOpenCheckpointTimestamp time.Time
startCloseCheckpointTimestamp time.Time
endCloseCheckpointTimestamp time.Time
startDeleteCheckpointTimestamp time.Time
endDeleteCheckpointTimestamp time.Time
)

tryCloseTicker := time.NewTicker(time.Second * checkpointDBKeepaliveSecond)

for {
select {
case <-tryCloseTicker.C: // close db when a long time unused for reducing memory usage.
if c.isOpened.Load() && time.Now().Sub(c.lastUsedTimestamp) > time.Second*checkpointDBKeepaliveSecond {
if err = c.checkpointDB.Close(); err != nil {
log.Warn("Failed to close checkpoint db", "err", err)
} else {
c.isOpened.Store(false)
}
}
case <-c.tryOpenCh: // lazy to open db for reducing memory usage.
c.lastUsedTimestamp = time.Now()
if !c.isOpened.Load() {
startOpenCheckpointTimestamp = time.Now()
c.checkpointDB, err = rawdb.Open(rawdb.OpenOptions{
Type: "pebble",
Directory: c.checkpointDir,
ReadOnly: true,
})
endOpenCheckpointTimestamp = time.Now()
openCheckpointTimer.Update(endOpenCheckpointTimestamp.Sub(startOpenCheckpointTimestamp))
if err != nil {
log.Warn("Failed to open raw db", "err", err)
} else {
c.isOpened.Store(true)
}

}
c.waitOpenCh <- struct{}{}
case <-c.tryCloseAndDeleteCh: // gc for reducing disk usage.
if c.isOpened.Load() {
startCloseCheckpointTimestamp = time.Now()
err = c.checkpointDB.Close()
endCloseCheckpointTimestamp = time.Now()
if err != nil {
log.Warn("Failed to close checkpoint db", "err", err)
} else {
c.isOpened.Store(false)
}
log.Info("Close checkpoint db", "err", err,
"elapsed", common.PrettyDuration(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp)))
closeCheckpointTimer.Update(endCloseCheckpointTimestamp.Sub(startCloseCheckpointTimestamp))
}
if !c.isOpened.Load() {
startDeleteCheckpointTimestamp = time.Now()
err = os.RemoveAll(c.checkpointDir)
endDeleteCheckpointTimestamp = time.Now()
log.Info("Delete checkpoint db", "err", err,
"elapsed", common.PrettyDuration(endDeleteCheckpointTimestamp.Sub(startDeleteCheckpointTimestamp)))
deleteCheckpointTimer.Update(endDeleteCheckpointTimestamp.Sub(startDeleteCheckpointTimestamp))
}
c.waitCloseAndDeleteCh <- struct{}{}
case <-c.tryCloseAndStopCh: // close for stop
if c.isOpened.Load() {
c.checkpointDB.Close()
}
close(c.waitCloseAndStopCh)
return
}

}
}

// waitDBOpen opens the db when the db is not open.
// Note that this is a best-effort operation and the db may still fail to open.
func (c *checkpointLayer) waitDBOpen() {
c.tryOpenCh <- struct{}{}
<-c.waitOpenCh
}

// waitDBCloseAndDelete closes the db and deletes the checkpoint directory when GC checkpoint.
// Note that this is a best-effort operation and the db may still fail to close and delete.
func (c *checkpointLayer) waitDBCloseAndDelete() {
c.tryCloseAndDeleteCh <- struct{}{}
<-c.waitCloseAndDeleteCh
}

// waitDBCloseAndStop closes the db and finish the background loop.
func (c *checkpointLayer) waitDBCloseAndStop() {
c.tryCloseAndStopCh <- struct{}{}
<-c.waitCloseAndStopCh
}

// Node implements the layer node interface.
func (c *checkpointLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
var (
startTimestamp time.Time
Expand All @@ -95,6 +207,11 @@ func (c *checkpointLayer) Node(owner common.Hash, path []byte, hash common.Hash)
queryCheckpointTimer.UpdateSince(startTimestamp)
}()

c.waitDBOpen()
if !c.isOpened.Load() {
return nil, errors.New("checkpoint db is not opened")
}

if owner == (common.Hash{}) {
nBlob, nHash = rawdb.ReadAccountTrieNode(c.checkpointDB, path)
} else {
Expand Down Expand Up @@ -146,6 +263,9 @@ type checkpointManager struct {

func newCheckpointManager(db ethdb.Database, checkpointDir string, enableCheckPoint bool, checkpointBlockInterval uint64, maxCheckpointNumber uint64) *checkpointManager {
startTimestamp := time.Now()
if checkpointBlockInterval == 0 {
checkpointBlockInterval = DefaultMaxCheckpointNumber
}
cm := &checkpointManager{db: db, checkpointDir: checkpointDir, checkpointBlockInterval: checkpointBlockInterval, maxCheckpointNumber: maxCheckpointNumber}
cm.enableCheckPoint.Store(enableCheckPoint)
cm.checkpointMap = make(map[common.Hash]*checkpointLayer)
Expand Down Expand Up @@ -176,7 +296,7 @@ func (cm *checkpointManager) close() {
cm.mux.Lock()
defer cm.mux.Unlock()
for _, ckptLayer := range cm.checkpointMap {
ckptLayer.checkpointDB.Close()
ckptLayer.waitDBCloseAndStop()
}
cm.checkpointMap = make(map[common.Hash]*checkpointLayer)
log.Info("Succeed to close checkpoint manager", "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
Expand Down Expand Up @@ -209,12 +329,12 @@ func (cm *checkpointManager) loop() {
}
}

func (cm *checkpointManager) gcCheckpoint() error {
func (cm *checkpointManager) gcCheckpoint() {
if !cm.enableCheckPoint.Load() {
return nil
return
}
if len(cm.checkpointMap) <= int(cm.maxCheckpointNumber) {
return nil
return
}

var (
Expand All @@ -241,24 +361,16 @@ func (cm *checkpointManager) gcCheckpoint() error {
startTimestamp = time.Now()
defer func() {
gcCheckpointTimer.UpdateSince(startTimestamp)
log.Info("GC a checkpoint", "checkpoint_block_number", gcCkptLayer.blockNumber,
"gc_checkpoint_dir", gcCheckpointDir,
"elapsed", common.PrettyDuration(time.Since(startTimestamp)),
"err", err)
log.Info("GC one checkpoint", "checkpoint_block_number", gcCkptLayer.blockNumber,
"gc_checkpoint_dir", gcCheckpointDir, "err", err,
"elapsed", common.PrettyDuration(time.Since(startTimestamp)))
}()

cm.mux.Lock()
delete(cm.checkpointMap, gcCkptLayer.rootHash())
cm.mux.Unlock()

if err = gcCkptLayer.checkpointDB.Close(); err != nil {
log.Warn("Failed to close checkpoint db", "err", err)
return err
}
gcCheckpointDir = encodeSubCheckpointDir(cm.checkpointDir, gcCkptLayer.blockNumber, gcCkptLayer.root)
err = os.RemoveAll(gcCheckpointDir)
gcCkptLayer.waitDBCloseAndDelete()
}
return nil
}

func (cm *checkpointManager) printCheckpointStat() error {
Expand All @@ -270,7 +382,11 @@ func (cm *checkpointManager) printCheckpointStat() error {
cm.mux.RLock()
checkpointSizeGauge.Update(int64(len(cm.checkpointMap)))
for _, ckptLayer := range cm.checkpointMap {
log.Info("Checkpoint detailed info", "checkpoint_block_number", ckptLayer.blockNumber, "checkpoint_block_root", ckptLayer.root.String())
log.Info("Checkpoint detailed info",
"checkpoint_block_number", ckptLayer.blockNumber,
"checkpoint_block_root", ckptLayer.root.String(),
"checkpoint_is_opened", ckptLayer.isOpened.Load(),
"last_used_timestamp", ckptLayer.lastUsedTimestamp.String())
}
cm.mux.RUnlock()
return nil
Expand All @@ -292,7 +408,7 @@ func (cm *checkpointManager) loadCheckpoint() error {
defer func() {
addCheckpointTimer.UpdateSince(startTimestamp)
log.Info("Load checkpoint", "load_checkpoint_number", succeedLoadNumber,
"elapsed", common.PrettyDuration(time.Since(startTimestamp)), "err", err)
"err", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)))
}()

dir, err := ioutil.ReadDir(cm.checkpointDir)
Expand All @@ -302,8 +418,8 @@ func (cm *checkpointManager) loadCheckpoint() error {
}
for _, f := range dir {
if f.IsDir() {
if ckptLayer, err = makeCheckpointDB(cm.checkpointDir + "/" + f.Name()); err != nil {
log.Warn("Failed to make checkpoint", "err", err)
if ckptLayer, err = newCheckpointLayer(cm.checkpointDir + "/" + f.Name()); err != nil {
log.Warn("Failed to new checkpoint layer", "err", err)
return err
}
cm.checkpointMap[ckptLayer.rootHash()] = ckptLayer
Expand All @@ -320,26 +436,34 @@ func (cm *checkpointManager) addCheckpoint(blockNumber uint64, blockRoot common.
}

var (
err error
startTimestamp time.Time
subCheckpointDir string
ckptLayer *checkpointLayer
err error
startTimestamp time.Time
startNewCheckpointTimestamp time.Time
endNewCheckpointTimestamp time.Time
subCheckpointDir string
ckptLayer *checkpointLayer
)

startTimestamp = time.Now()
defer func() {
addCheckpointTimer.UpdateSince(startTimestamp)
log.Info("Add checkpoint", "block_number", blockNumber, "block_root", blockRoot.String(),
"elapsed", common.PrettyDuration(time.Since(startTimestamp)), "err", err)
newCheckpointTimer.Update(endNewCheckpointTimestamp.Sub(startNewCheckpointTimestamp))
log.Info("Add a new checkpoint", "block_number", blockNumber, "block_root", blockRoot.String(),
"err", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp)),
"new_elapsed", common.PrettyDuration(endNewCheckpointTimestamp.Sub(startNewCheckpointTimestamp)))
}()

subCheckpointDir = encodeSubCheckpointDir(cm.checkpointDir, blockNumber, blockRoot)
if err = cm.db.NewCheckpoint(subCheckpointDir); err != nil {
startNewCheckpointTimestamp = time.Now()
err = cm.db.NewCheckpoint(subCheckpointDir)
endNewCheckpointTimestamp = time.Now()
if err != nil {
log.Warn("Failed to create checkpoint", "err", err)
return err
}

if ckptLayer, err = makeCheckpointDB(subCheckpointDir); err != nil {
ckptLayer, err = newCheckpointLayer(subCheckpointDir)
if err != nil {
log.Warn("Failed to make checkpoint db", "err", err)
return err
}
Expand All @@ -364,7 +488,7 @@ func (cm *checkpointManager) getCheckpointLayer(root common.Hash) (layer, error)
startTimestamp = time.Now()
defer func() {
getCheckpointTimer.UpdateSince(startTimestamp)
log.Info("Get checkpoint", "root", root.String(),
log.Info("Get the checkpoint", "root", root.String(),
"elapsed", common.PrettyDuration(time.Since(startTimestamp)), "err", err)
}()

Expand Down
14 changes: 9 additions & 5 deletions trie/triedb/pathdb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ var (
proposedBlockReaderLessDifflayer = metrics.NewRegisteredMeter("pathdb/nodebufferlist/proposedblockreader/lessdifflayer", nil)

// checkpoint metrics
checkpointSizeGauge = metrics.NewRegisteredGauge("pathdb/checkpoint/size", nil)
addCheckpointTimer = metrics.NewRegisteredTimer("pathdb/addcheckpoint/time", nil)
getCheckpointTimer = metrics.NewRegisteredTimer("pathdb/getcheckpoint/time", nil)
gcCheckpointTimer = metrics.NewRegisteredTimer("pathdb/gccheckpoint/time", nil)
queryCheckpointTimer = metrics.NewRegisteredTimer("pathdb/querycheckpoint/time", nil)
checkpointSizeGauge = metrics.NewRegisteredGauge("pathdb/checkpoint/size", nil)
addCheckpointTimer = metrics.NewRegisteredTimer("pathdb/addcheckpoint/time", nil)
newCheckpointTimer = metrics.NewRegisteredTimer("pathdb/newcheckpoint/time", nil)
openCheckpointTimer = metrics.NewRegisteredTimer("pathdb/opencheckpoint/time", nil)
getCheckpointTimer = metrics.NewRegisteredTimer("pathdb/getcheckpoint/time", nil)
gcCheckpointTimer = metrics.NewRegisteredTimer("pathdb/gccheckpoint/time", nil)
closeCheckpointTimer = metrics.NewRegisteredTimer("pathdb/closecheckpoint/time", nil)
deleteCheckpointTimer = metrics.NewRegisteredTimer("pathdb/deletecheckpoint/time", nil)
queryCheckpointTimer = metrics.NewRegisteredTimer("pathdb/querycheckpoint/time", nil)
)
3 changes: 2 additions & 1 deletion trie/triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
_ = nf.popBack()

if nf.checkpointManager.needDoCheckpoint(buffer.block) {
// todo: check
persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
if err != nil {
Expand Down Expand Up @@ -586,7 +587,7 @@ func (nf *nodebufferlist) proposedBlockReader(blockRoot common.Hash) (layer, err
if err != nil {
context = append(context, []interface{}{"err", err}...)
log.Error("proposed block state is not available", context...)
return nil, fmt.Errorf("proposed block proof state %#x is not available", blockRoot)
return nil, fmt.Errorf("proposed block proof state %#x is not available", blockRoot.String())
}
return ckptLayer, nil
}
Expand Down

0 comments on commit e8291f4

Please sign in to comment.