@@ -22,6 +22,7 @@ import (
2222 "errors"
2323 "fmt"
2424 "math"
25+ "math/big"
2526 "os"
2627 "path/filepath"
2728 "sync"
@@ -62,6 +63,8 @@ type Config struct {
6263 BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
6364 Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots
6465 CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants
66+
67+ ParallelStorageTraversal bool // Whether to prune in parallel per account
6568}
6669
6770// Pruner is an offline tool to prune the stale state with the
@@ -401,54 +404,75 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
401404 output .Put (data .CodeHash , nil )
402405 }
403406 if data .Root != (common.Hash {}) {
404- // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
405- // we already check that when opening state trie and reading the account node
406- trieID := trie .StorageTrieID (data .Root , key , data .Root )
407- storageTr , err := trie .NewStateTrie (trieID , sdb .TrieDB ())
408- if err != nil {
409- return err
410- }
411- err = <- results
412- if err != nil {
413- return err
407+ numParallelIteration := int64 (1 )
408+ if config .ParallelStorageTraversal {
409+ numParallelIteration = int64 (32 )
414410 }
415- go func () {
416- threadsRunning .Add (1 )
417- defer threadsRunning .Add (- 1 )
418- var err error
419- defer func () {
420- results <- err
421- }()
422- threadStartedAt := time .Now ()
423- threadLastLog := time .Now ()
424-
425- storageIt , err := storageTr .NodeIterator (nil )
411+ for i := int64 (1 ); i <= numParallelIteration ; i ++ {
412+ err = <- results
426413 if err != nil {
427- return
414+ return err
428415 }
429- var processedNodes uint64
430- for storageIt .Next (true ) {
431- storageTrieHash := storageIt .Hash ()
432- if storageTrieHash != (common.Hash {}) {
433- // The inner bloomfilter library has a mutex so concurrency is fine here
434- err = output .Put (storageTrieHash .Bytes (), nil )
435- if err != nil {
436- return
416+ go func (iteration int64 ) {
417+ threadsRunning .Add (1 )
418+ defer threadsRunning .Add (- 1 )
419+ var processedNodes uint64
420+ threadStartedAt := time .Now ()
421+ threadLastLog := time .Now ()
422+ var err error
423+ defer func () {
424+ results <- err
425+ }()
426+ // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
427+ // we already check that when opening state trie and reading the account node
428+ // Need to create a new trie for each iteration, to avoid race conditions.
429+ trieID := trie .StorageTrieID (data .Root , key , data .Root )
430+ var storageTr * trie.StateTrie
431+ storageTr , err = trie .NewStateTrie (trieID , sdb .TrieDB ())
432+ if err != nil {
433+ return
434+ }
435+ var startIt trie.NodeIterator
436+ startIt , err = storageTr .NodeIterator (big .NewInt ((iteration - 1 ) << 3 ).Bytes ())
437+ if err != nil {
438+ return
439+ }
440+ // Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part
441+ var startItPath , endItPath []byte
442+
443+ key := trie .KeybytesToHex (big .NewInt ((iteration ) << 3 ).Bytes ())
444+ key = key [:len (key )- 1 ]
445+ for startIt .Next (true ) {
446+ if iteration != numParallelIteration && bytes .Compare (startIt .Path (), key ) > 0 {
447+ break
448+ }
449+ if startItPath == nil {
450+ startItPath = startIt .Path ()
451+ }
452+ endItPath = startIt .Path ()
453+ storageTrieHash := startIt .Hash ()
454+ if storageTrieHash != (common.Hash {}) {
455+ // The inner bloomfilter library has a mutex so concurrency is fine here
456+ err = output .Put (storageTrieHash .Bytes (), nil )
457+ if err != nil {
458+ return
459+ }
460+ }
461+ processedNodes ++
462+ if time .Since (threadLastLog ) > 5 * time .Minute {
463+ elapsedTotal := time .Since (startedAt )
464+ elapsedThread := time .Since (threadStartedAt )
465+ log .Info ("traversing trie database - traversing storage trie taking long" , "key" , key , "elapsedTotal" , elapsedTotal , "elapsedThread" , elapsedThread , "processedNodes" , processedNodes , "threadsRunning" , threadsRunning .Load ())
466+ threadLastLog = time .Now ()
437467 }
438468 }
439- processedNodes ++
440- if time .Since (threadLastLog ) > 5 * time .Minute {
441- elapsedTotal := time .Since (startedAt )
442- elapsedThread := time .Since (threadStartedAt )
443- log .Info ("traversing trie database - traversing storage trie taking long" , "key" , key , "elapsedTotal" , elapsedTotal , "elapsedThread" , elapsedThread , "processedNodes" , processedNodes , "threadsRunning" , threadsRunning .Load ())
444- threadLastLog = time .Now ()
469+ err = startIt .Error ()
470+ if err != nil {
471+ return
445472 }
446- }
447- err = storageIt .Error ()
448- if err != nil {
449- return
450- }
451- }()
473+ log .Trace ("Finished traversing storage trie" , "key" , key , "startPath" , startItPath , "endPath" , endItPath )
474+ }(i )
475+ }
452476 }
453477 }
454478 }
0 commit comments