@@ -22,6 +22,7 @@ import (
22
22
"errors"
23
23
"fmt"
24
24
"math"
25
+ "math/big"
25
26
"os"
26
27
"path/filepath"
27
28
"sync"
@@ -62,6 +63,8 @@ type Config struct {
62
63
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
63
64
Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots
64
65
CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants
66
+
67
+ ParallelStorageTraversal bool // Whether to prune in parallel per account
65
68
}
66
69
67
70
// Pruner is an offline tool to prune the stale state with the
@@ -401,54 +404,75 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
401
404
output .Put (data .CodeHash , nil )
402
405
}
403
406
if data .Root != (common.Hash {}) {
404
- // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
405
- // we already check that when opening state trie and reading the account node
406
- trieID := trie .StorageTrieID (data .Root , key , data .Root )
407
- storageTr , err := trie .NewStateTrie (trieID , sdb .TrieDB ())
408
- if err != nil {
409
- return err
410
- }
411
- err = <- results
412
- if err != nil {
413
- return err
407
+ numParallelIteration := int64 (1 )
408
+ if config .ParallelStorageTraversal {
409
+ numParallelIteration = int64 (32 )
414
410
}
415
- go func () {
416
- threadsRunning .Add (1 )
417
- defer threadsRunning .Add (- 1 )
418
- var err error
419
- defer func () {
420
- results <- err
421
- }()
422
- threadStartedAt := time .Now ()
423
- threadLastLog := time .Now ()
424
-
425
- storageIt , err := storageTr .NodeIterator (nil )
411
+ for i := int64 (1 ); i <= numParallelIteration ; i ++ {
412
+ err = <- results
426
413
if err != nil {
427
- return
414
+ return err
428
415
}
429
- var processedNodes uint64
430
- for storageIt .Next (true ) {
431
- storageTrieHash := storageIt .Hash ()
432
- if storageTrieHash != (common.Hash {}) {
433
- // The inner bloomfilter library has a mutex so concurrency is fine here
434
- err = output .Put (storageTrieHash .Bytes (), nil )
435
- if err != nil {
436
- return
416
+ go func (iteration int64 ) {
417
+ threadsRunning .Add (1 )
418
+ defer threadsRunning .Add (- 1 )
419
+ var processedNodes uint64
420
+ threadStartedAt := time .Now ()
421
+ threadLastLog := time .Now ()
422
+ var err error
423
+ defer func () {
424
+ results <- err
425
+ }()
426
+ // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
427
+ // we already check that when opening state trie and reading the account node
428
+ // Need to create a new trie for each iteration, to avoid race conditions.
429
+ trieID := trie .StorageTrieID (data .Root , key , data .Root )
430
+ var storageTr * trie.StateTrie
431
+ storageTr , err = trie .NewStateTrie (trieID , sdb .TrieDB ())
432
+ if err != nil {
433
+ return
434
+ }
435
+ var startIt trie.NodeIterator
436
+ startIt , err = storageTr .NodeIterator (big .NewInt ((iteration - 1 ) << 3 ).Bytes ())
437
+ if err != nil {
438
+ return
439
+ }
440
+ // Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part
441
+ var startItPath , endItPath []byte
442
+
443
+ key := trie .KeybytesToHex (big .NewInt ((iteration ) << 3 ).Bytes ())
444
+ key = key [:len (key )- 1 ]
445
+ for startIt .Next (true ) {
446
+ if iteration != numParallelIteration && bytes .Compare (startIt .Path (), key ) > 0 {
447
+ break
448
+ }
449
+ if startItPath == nil {
450
+ startItPath = startIt .Path ()
451
+ }
452
+ endItPath = startIt .Path ()
453
+ storageTrieHash := startIt .Hash ()
454
+ if storageTrieHash != (common.Hash {}) {
455
+ // The inner bloomfilter library has a mutex so concurrency is fine here
456
+ err = output .Put (storageTrieHash .Bytes (), nil )
457
+ if err != nil {
458
+ return
459
+ }
460
+ }
461
+ processedNodes ++
462
+ if time .Since (threadLastLog ) > 5 * time .Minute {
463
+ elapsedTotal := time .Since (startedAt )
464
+ elapsedThread := time .Since (threadStartedAt )
465
+ log .Info ("traversing trie database - traversing storage trie taking long" , "key" , key , "elapsedTotal" , elapsedTotal , "elapsedThread" , elapsedThread , "processedNodes" , processedNodes , "threadsRunning" , threadsRunning .Load ())
466
+ threadLastLog = time .Now ()
437
467
}
438
468
}
439
- processedNodes ++
440
- if time .Since (threadLastLog ) > 5 * time .Minute {
441
- elapsedTotal := time .Since (startedAt )
442
- elapsedThread := time .Since (threadStartedAt )
443
- log .Info ("traversing trie database - traversing storage trie taking long" , "key" , key , "elapsedTotal" , elapsedTotal , "elapsedThread" , elapsedThread , "processedNodes" , processedNodes , "threadsRunning" , threadsRunning .Load ())
444
- threadLastLog = time .Now ()
469
+ err = startIt .Error ()
470
+ if err != nil {
471
+ return
445
472
}
446
- }
447
- err = storageIt .Error ()
448
- if err != nil {
449
- return
450
- }
451
- }()
473
+ log .Trace ("Finished traversing storage trie" , "key" , key , "startPath" , startItPath , "endPath" , endItPath )
474
+ }(i )
475
+ }
452
476
}
453
477
}
454
478
}
0 commit comments