Skip to content

Commit

Permalink
add option to enable paralle prune
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Jan 24, 2025
1 parent b192b19 commit 3dfa815
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Config struct {
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots
CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants

ParallelPrune bool // Whether to prune in parallel per account
}

// Pruner is an offline tool to prune the stale state with the
Expand Down Expand Up @@ -429,59 +431,88 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
threadLastLog := time.Now()

var processedNodes uint64
for i := int64(1); i <= 32; i++ {
err = <-resultsPerAccount
if !config.ParallelPrune {
storageIt, err := storageTr.NodeIterator(nil)
if err != nil {
return
}
var startIterator trie.NodeIterator
startIterator, err = storageTr.NodeIterator(big.NewInt((i - 1) << 3).Bytes())
var processedNodes uint64
for storageIt.Next(true) {
storageTrieHash := storageIt.Hash()
if storageTrieHash != (common.Hash{}) {
// The inner bloomfilter library has a mutex so concurrency is fine here
err = output.Put(storageTrieHash.Bytes(), nil)
if err != nil {
return
}
}
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
}
}
err = storageIt.Error()
if err != nil {
return
}
go func(startIt trie.NodeIterator, iteration int64) {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
resultsPerAccount <- err
}()

// Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part
var startItPath, endItPath []byte

key := keybytesToHex(big.NewInt((iteration) << 3).Bytes())
key = key[:len(key)-1]
for startIt.Next(true) {
if iteration != 32 && bytes.Compare(startIt.Path(), key) >= 0 {
break
}
if startItPath == nil {
startItPath = startIt.Path()
}
endItPath = startIt.Path()
storageTrieHash := startIt.Hash()
if storageTrieHash != (common.Hash{}) {
// The inner bloomfilter library has a mutex so concurrency is fine here
err = output.Put(storageTrieHash.Bytes(), nil)
if err != nil {
return
}
}
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
}
} else {
for i := int64(1); i <= 32; i++ {
err = <-resultsPerAccount
if err != nil {
return
}
err = startIt.Error()
var startIterator trie.NodeIterator
startIterator, err = storageTr.NodeIterator(big.NewInt((i - 1) << 3).Bytes())
if err != nil {
return
}
log.Trace("Finished traversing storage trie", "key", key, "startPath", startItPath, "endPath", endItPath)
}(startIterator, i)
go func(startIt trie.NodeIterator, iteration int64) {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
resultsPerAccount <- err
}()

// Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part
var startItPath, endItPath []byte

key := keybytesToHex(big.NewInt((iteration) << 3).Bytes())
key = key[:len(key)-1]
for startIt.Next(true) {
if iteration != 32 && bytes.Compare(startIt.Path(), key) >= 0 {
break
}
if startItPath == nil {
startItPath = startIt.Path()
}
endItPath = startIt.Path()
storageTrieHash := startIt.Hash()
if storageTrieHash != (common.Hash{}) {
// The inner bloomfilter library has a mutex so concurrency is fine here
err = output.Put(storageTrieHash.Bytes(), nil)
if err != nil {
return
}
}
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
}
}
err = startIt.Error()
if err != nil {
return
}
log.Trace("Finished traversing storage trie", "key", key, "startPath", startItPath, "endPath", endItPath)
}(startIterator, i)
}
}
}()
}
Expand Down

0 comments on commit 3dfa815

Please sign in to comment.