Skip to content

Commit

Permalink
chore: polish buffer list flush skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
will@2012 committed Mar 28, 2024
1 parent e8291f4 commit a49c960
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 59 deletions.
4 changes: 4 additions & 0 deletions trie/triedb/pathdb/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ import (
const (
// DefaultMaxCheckpointNumber is used to keep checkpoint number, default is 3 days for main-net.
DefaultMaxCheckpointNumber = 24 * 3

// DefaultCheckpointDir is used to store checkpoint directory.
DefaultCheckpointDir = "checkpoint"

// gcCheckpointIntervalSecond is used to control gc loop interval.
gcCheckpointIntervalSecond = 1

// printCheckpointStatIntervalSecond is used to print checkpoint stat.
printCheckpointStatIntervalSecond = 60

// checkpointDBKeepaliveSecond is used to check db which has been not used for a long time,
// and close it for reducing memory.
checkpointDBKeepaliveSecond = 60
Expand Down
107 changes: 48 additions & 59 deletions trie/triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/VictoriaMetrics/fastcache"
Expand All @@ -20,7 +19,7 @@ import (

const (
// mergeMultiDifflayerInterval defines the interval to collect nodes to flush disk.
mergeMultiDifflayerInterval = 3
mergeMultiDifflayerInterval = 1

// DefaultProposeBlockInterval defines the interval of op-proposer proposes block.
DefaultProposeBlockInterval = 3600
Expand Down Expand Up @@ -57,9 +56,9 @@ type nodebufferlist struct {
persistID uint64 // The last state id that have written to disk.
baseMux sync.RWMutex // The mutex of base multiDifflayer and persistID.

isFlushing atomic.Bool // Flag indicates writing disk under background.
stopFlushing atomic.Bool // Flag stops writing disk under background.
stopCh chan struct{}
forceFlushCh chan struct{}
waitForceFlushCh chan struct{}
stopCh chan struct{}

checkpointManager *checkpointManager
}
Expand Down Expand Up @@ -114,6 +113,8 @@ func newNodeBufferList(
count: 1,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
forceFlushCh: make(chan struct{}),
waitForceFlushCh: make(chan struct{}),
checkpointManager: newCheckpointManager(db, checkpointDir, enableCheckpoint, wpBlocks, maxCheckpointNumber),
}
go nf.loop()
Expand Down Expand Up @@ -239,46 +240,8 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
defer nf.mux.Unlock()
defer nf.baseMux.Unlock()

nf.stopFlushing.Store(true)
defer nf.stopFlushing.Store(false)
for {
if nf.isFlushing.Swap(true) {
time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second)
log.Info("waiting base node buffer to be flushed to disk")
continue
} else {
break
}
}

commitFunc := func(buffer *multiDifflayer) bool {
if err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes); err != nil {
log.Crit("failed to commit nodes to base node buffer", "error", err)
}
_ = nf.popBack()

if nf.checkpointManager.needDoCheckpoint(buffer.block) {
// todo: check
persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
if err != nil {
log.Crit("failed to flush base node buffer to disk", "error", err)
}
nf.base.reset()
nf.checkpointManager.addCheckpoint(buffer.block, buffer.root)
}

return true
}
nf.traverseReverse(commitFunc)
persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
if err != nil {
log.Crit("failed to flush base node buffer to disk", "error", err)
}
nf.isFlushing.Store(false)
nf.base.reset()
nf.persistID = persistID
nf.forceFlushCh <- struct{}{}
<-nf.waitForceFlushCh
return nil
}

Expand Down Expand Up @@ -348,13 +311,14 @@ func (nf *nodebufferlist) getLayers() uint64 {

// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk.
func (nf *nodebufferlist) waitAndStopFlushing() {
nf.forceFlushCh <- struct{}{}
<-nf.waitForceFlushCh

close(nf.stopCh)
nf.stopFlushing.Store(true)
for nf.isFlushing.Load() {
time.Sleep(time.Second)
log.Warn("waiting background node buffer to be flushed to disk")
}
nf.checkpointManager.close()

nf.report()
log.Info("Succeed to stop node buffer list")
}

// setClean sets fastcache to trienodebuffer for cache the trie nodes, used for nodebufferlist.
Expand Down Expand Up @@ -534,23 +498,48 @@ func (nf *nodebufferlist) backgroundFlush() {

// loop runs the background task, collects the nodes for writing to disk.
func (nf *nodebufferlist) loop() {
mergeTicker := time.NewTicker(time.Second * mergeMultiDifflayerInterval)
loopFlushTicker := time.NewTicker(time.Second * mergeMultiDifflayerInterval)

for {
select {
case <-nf.stopCh:
return
case <-mergeTicker.C:
if nf.stopFlushing.Load() {
continue
}
if nf.isFlushing.Swap(true) {
continue
}
case <-loopFlushTicker.C: // background loop flush.
needFlush := nf.diffToBase()
if needFlush {
nf.backgroundFlush()
}
nf.isFlushing.Swap(false)
case <-nf.forceFlushCh: // force flush all.
commitFunc := func(buffer *multiDifflayer) bool {
if err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes); err != nil {
log.Crit("failed to commit nodes to base node buffer", "error", err)
}
_ = nf.popBack()
if nf.checkpointManager.needDoCheckpoint(buffer.block) {
persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
if err != nil {
log.Crit("failed to flush base node buffer to disk", "error", err)
}
if err = nf.checkpointManager.addCheckpoint(buffer.block, buffer.root); err != nil {
log.Crit("Failed to add new disk checkpoint", "error", err)
}
nf.base.reset()
nf.persistID = persistID
}

return true
}
nf.traverseReverse(commitFunc)
persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
if err != nil {
log.Crit("failed to flush base node buffer to disk", "error", err)
}
nf.base.reset()
nf.persistID = persistID

nf.waitForceFlushCh <- struct{}{}
}
}
}
Expand Down

0 comments on commit a49c960

Please sign in to comment.