Skip to content

Commit

Permalink
Changes based on PR comments and fix race
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Jan 27, 2025
1 parent 3dfa815 commit ed1277c
Showing 1 changed file with 79 additions and 81 deletions.
160 changes: 79 additions & 81 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,6 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
for i := 0; i < threads; i++ {
results <- nil
}
// We also create a semaphore for the storage trie traversal, to limit the number of goroutines
resultsPerAccount := make(chan error, threads)
for i := 0; i < threads; i++ {
resultsPerAccount <- nil
}
var threadsRunning atomic.Int32

for accountIt.Next(true) {
Expand Down Expand Up @@ -409,29 +404,27 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
output.Put(data.CodeHash, nil)
}
if data.Root != (common.Hash{}) {
// note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
// we already check that when opening state trie and reading the account node
trieID := trie.StorageTrieID(data.Root, key, data.Root)
storageTr, err := trie.NewStateTrie(trieID, sdb.TrieDB())
if err != nil {
return err
}
err = <-results
if err != nil {
return err
}
go func() {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
results <- err
}()
threadStartedAt := time.Now()
threadLastLog := time.Now()

var processedNodes uint64
if !config.ParallelPrune {
if !config.ParallelPrune {
err = <-results
if err != nil {
return err
}
go func() {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
results <- err
}()
threadStartedAt := time.Now()
threadLastLog := time.Now()
// note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
// we already check that when opening state trie and reading the account node
trieID := trie.StorageTrieID(data.Root, key, data.Root)
storageTr, err := trie.NewStateTrie(trieID, sdb.TrieDB())
if err != nil {
return
}
storageIt, err := storageTr.NodeIterator(nil)
if err != nil {
return
Expand All @@ -458,63 +451,74 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
if err != nil {
return
}
} else {
for i := int64(1); i <= 32; i++ {
err = <-resultsPerAccount
}()
} else {
for i := int64(1); i <= 32; i++ {
err = <-results
if err != nil {
return err
}
go func(iteration int64) {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var processedNodes uint64
threadStartedAt := time.Now()
threadLastLog := time.Now()
var err error
defer func() {
results <- err
}()
// note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
// we already check that when opening state trie and reading the account node
// Need to create a new trie for each iteration, to avoid race conditions.
trieID := trie.StorageTrieID(data.Root, key, data.Root)
var storageTr *trie.StateTrie
storageTr, err = trie.NewStateTrie(trieID, sdb.TrieDB())
if err != nil {
return
}
var startIterator trie.NodeIterator
startIterator, err = storageTr.NodeIterator(big.NewInt((i - 1) << 3).Bytes())
var startIt trie.NodeIterator
startIt, err = storageTr.NodeIterator(big.NewInt((i - 1) << 3).Bytes())

Check failure on line 481 in core/state/pruner/pruner.go

View workflow job for this annotation

GitHub Actions / run-linter

loopclosure: loop variable i captured by func literal (govet)
if err != nil {
return
}
go func(startIt trie.NodeIterator, iteration int64) {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
resultsPerAccount <- err
}()

// Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part
var startItPath, endItPath []byte

key := keybytesToHex(big.NewInt((iteration) << 3).Bytes())
key = key[:len(key)-1]
for startIt.Next(true) {
if iteration != 32 && bytes.Compare(startIt.Path(), key) >= 0 {
break
}
if startItPath == nil {
startItPath = startIt.Path()
}
endItPath = startIt.Path()
storageTrieHash := startIt.Hash()
if storageTrieHash != (common.Hash{}) {
// The inner bloomfilter library has a mutex so concurrency is fine here
err = output.Put(storageTrieHash.Bytes(), nil)
if err != nil {
return
}
}
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
// Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part
var startItPath, endItPath []byte

key := keybytesToHex(big.NewInt((iteration) << 3).Bytes())
key = key[:len(key)-1]
for startIt.Next(true) {
if iteration != 32 && bytes.Compare(startIt.Path(), key) >= 0 {
break
}
if startItPath == nil {
startItPath = startIt.Path()
}
endItPath = startIt.Path()
storageTrieHash := startIt.Hash()
if storageTrieHash != (common.Hash{}) {
// The inner bloomfilter library has a mutex so concurrency is fine here
err = output.Put(storageTrieHash.Bytes(), nil)
if err != nil {
return
}
}
err = startIt.Error()
if err != nil {
return
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
}
log.Trace("Finished traversing storage trie", "key", key, "startPath", startItPath, "endPath", endItPath)
}(startIterator, i)
}
}
err = startIt.Error()
if err != nil {
return
}
log.Trace("Finished traversing storage trie", "key", key, "startPath", startItPath, "endPath", endItPath)
}(i)
}
}()
}
}
}
}
Expand All @@ -527,12 +531,6 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
return err
}
}
for i := 0; i < threads; i++ {
err = <-resultsPerAccount
if err != nil {
return err
}
}
return nil
}

Expand Down

0 comments on commit ed1277c

Please sign in to comment.