Skip to content

Commit

Permalink
fix: limit opened db number to avoid oom
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Mar 30, 2024
1 parent c750e7c commit 12c514c
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
62 changes: 58 additions & 4 deletions trie/triedb/pathdb/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ const (
// printCheckpointStatIntervalSecond is used to print checkpoint stat.
printCheckpointStatIntervalSecond = 60

// checkpointDBKeepaliveSecond is used to check db which has been not used for a long time,
// checkDBKeepaliveSecond is used to check db which has been not used for a long time,
// and close it for reducing memory.
checkpointDBKeepaliveSecond = 60
checkDBKeepaliveIntervalSecond = 60

// checkDBOpenedIntervalSecond is used to max opened db number to avoid too many db oom.
checkDBOpenedIntervalSecond = 1

// defaultMaxOpenedNumber is max opened number.
defaultMaxOpenedNumber = 3
)

// encodeSubCheckpointDir encodes a completed checkpoint directory.
Expand Down Expand Up @@ -76,6 +82,7 @@ type checkpointLayer struct {
waitCloseAndDeleteCh chan struct{}
tryCloseAndStopCh chan struct{}
waitCloseAndStopCh chan struct{}
tryCloseCh chan struct{}
}

var _ layer = &checkpointLayer{}
Expand Down Expand Up @@ -104,6 +111,7 @@ func newCheckpointLayer(checkpointDir string) (ckptLayer *checkpointLayer, err e
ckptLayer.waitCloseAndDeleteCh = make(chan struct{})
ckptLayer.tryCloseAndStopCh = make(chan struct{})
ckptLayer.waitCloseAndStopCh = make(chan struct{})
ckptLayer.tryCloseCh = make(chan struct{})
go ckptLayer.loop()
return ckptLayer, nil
}
Expand All @@ -119,12 +127,21 @@ func (c *checkpointLayer) loop() {
endDeleteCheckpointTimestamp time.Time
)

tryCloseTicker := time.NewTicker(time.Second * checkpointDBKeepaliveSecond)
tryCloseTicker := time.NewTicker(time.Second * checkDBKeepaliveIntervalSecond)
defer tryCloseTicker.Stop()

for {
select {
case <-tryCloseTicker.C: // close db when a long time unused for reducing memory usage.
if c.isOpened.Load() && time.Now().Sub(c.lastUsedTimestamp) > time.Second*checkpointDBKeepaliveSecond {
if c.isOpened.Load() && time.Now().Sub(c.lastUsedTimestamp) > time.Second*checkDBKeepaliveIntervalSecond {
if err = c.checkpointDB.Close(); err != nil {
log.Warn("Failed to close checkpoint db", "error", err)
} else {
c.isOpened.Store(false)
}
}
case <-c.tryCloseCh: // close db when total opened db > max for reducing memory usage.
if c.isOpened.Load() {
if err = c.checkpointDB.Close(); err != nil {
log.Warn("Failed to close checkpoint db", "error", err)
} else {
Expand Down Expand Up @@ -205,6 +222,11 @@ func (c *checkpointLayer) waitDBCloseAndStop() {
<-c.waitCloseAndStopCh
}

// closeDB closes the db.
func (c *checkpointLayer) closeDB() {
c.tryCloseCh <- struct{}{}
}

// Node implements the layer node interface.
func (c *checkpointLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
var (
Expand Down Expand Up @@ -332,7 +354,10 @@ func (cm *checkpointManager) needDoCheckpoint(currentCommitBlockNumber uint64) b

func (cm *checkpointManager) loop() {
gcCheckpointTicker := time.NewTicker(time.Second * gcCheckpointIntervalSecond)
defer gcCheckpointTicker.Stop()
printCheckpointStatTicker := time.NewTicker(time.Second * printCheckpointStatIntervalSecond)
defer printCheckpointStatTicker.Stop()
checkMaxDBOpenedTicker := time.NewTicker(time.Second * checkDBOpenedIntervalSecond)

for {
select {
Expand All @@ -343,10 +368,39 @@ func (cm *checkpointManager) loop() {
cm.gcCheckpoint()
case <-printCheckpointStatTicker.C:
cm.printCheckpointStat()
case <-checkMaxDBOpenedTicker.C:
cm.checkMaxDBOpened()
}
}
}

// checkMaxDBOpened closes the earliest opened db when the number of open dbs is greater than the configuration.
func (cm *checkpointManager) checkMaxDBOpened() {
if !cm.enableCheckPoint.Load() {
return
}

var (
totalOpennedNumber int64
toCloseCkptLayer *checkpointLayer
)
cm.mux.RLock()
for _, ckptLayer := range cm.checkpointMap {
if ckptLayer.isOpened.Load() {
totalOpennedNumber = totalOpennedNumber + 1
if toCloseCkptLayer == nil || ckptLayer.lastUsedTimestamp.Before(toCloseCkptLayer.lastUsedTimestamp) {
toCloseCkptLayer = ckptLayer
}
}
}
cm.mux.RUnlock()

openedCheckpointSizeGauge.Update(totalOpennedNumber)
if totalOpennedNumber > defaultMaxOpenedNumber {
toCloseCkptLayer.closeDB()
}
}

// gcCheckpoint deletes redundant checkpoints in the background.
func (cm *checkpointManager) gcCheckpoint() {
if !cm.enableCheckPoint.Load() {
Expand Down
19 changes: 10 additions & 9 deletions trie/triedb/pathdb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ var (
proposedBlockReaderLessDifflayer = metrics.NewRegisteredMeter("pathdb/nodebufferlist/proposedblockreader/lessdifflayer", nil)

// checkpoint metrics
checkpointSizeGauge = metrics.NewRegisteredGauge("pathdb/checkpoint/size", nil)
addCheckpointTimer = metrics.NewRegisteredTimer("pathdb/addcheckpoint/time", nil)
newCheckpointTimer = metrics.NewRegisteredTimer("pathdb/newcheckpoint/time", nil)
openCheckpointTimer = metrics.NewRegisteredTimer("pathdb/opencheckpoint/time", nil)
getCheckpointTimer = metrics.NewRegisteredTimer("pathdb/getcheckpoint/time", nil)
gcCheckpointTimer = metrics.NewRegisteredTimer("pathdb/gccheckpoint/time", nil)
closeCheckpointTimer = metrics.NewRegisteredTimer("pathdb/closecheckpoint/time", nil)
deleteCheckpointTimer = metrics.NewRegisteredTimer("pathdb/deletecheckpoint/time", nil)
queryCheckpointTimer = metrics.NewRegisteredTimer("pathdb/querycheckpoint/time", nil)
checkpointSizeGauge = metrics.NewRegisteredGauge("pathdb/checkpoint/size", nil)
openedCheckpointSizeGauge = metrics.NewRegisteredGauge("pathdb/openedcheckpoint/size", nil)
addCheckpointTimer = metrics.NewRegisteredTimer("pathdb/addcheckpoint/time", nil)
newCheckpointTimer = metrics.NewRegisteredTimer("pathdb/newcheckpoint/time", nil)
openCheckpointTimer = metrics.NewRegisteredTimer("pathdb/opencheckpoint/time", nil)
getCheckpointTimer = metrics.NewRegisteredTimer("pathdb/getcheckpoint/time", nil)
gcCheckpointTimer = metrics.NewRegisteredTimer("pathdb/gccheckpoint/time", nil)
closeCheckpointTimer = metrics.NewRegisteredTimer("pathdb/closecheckpoint/time", nil)
deleteCheckpointTimer = metrics.NewRegisteredTimer("pathdb/deletecheckpoint/time", nil)
queryCheckpointTimer = metrics.NewRegisteredTimer("pathdb/querycheckpoint/time", nil)
)
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (nf *nodebufferlist) proposedBlockReader(blockRoot common.Hash) (layer, err
log.Warn("failed to get propose block reader", "node buffer list count", nf.count)
ckptLayer, err := nf.checkpointManager.getCheckpointLayer(blockRoot)
if err != nil {
log.Error("proposed block state is not available", "block_root", blockRoot.String())
log.Error("proposed block state is not available", "block_root", blockRoot.String(), "bufferlist_count", nf.count)
return nil, fmt.Errorf("proposed block proof state %#x is not available", blockRoot.String())
}
// Just restart and need to read from checkpoint.
Expand Down

0 comments on commit 12c514c

Please sign in to comment.