Skip to content

Commit

Permalink
Caplin: fix bad CopyTo (#13797) (#13809)
Browse files Browse the repository at this point in the history
Cherry-pick
  • Loading branch information
Giulio2002 authored Feb 14, 2025
1 parent 66c897a commit dd3cff8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 48 deletions.
58 changes: 31 additions & 27 deletions cl/merkle_tree/merkle_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,42 +201,46 @@ func (m *MerkleTree) CopyInto(other *MerkleTree) {
m.mu.RLock()
defer m.mu.RUnlock()
defer other.mu.Unlock()
//other.computeLeaf = m.computeLeaf
if len(other.layers) > len(m.layers) {
// reset the internal layers
for i := len(m.layers); i < len(other.layers); i++ {
other.layers[i] = other.layers[i][:0]
}
other.layers = other.layers[:len(m.layers)]

// Copy primitive fields
other.computeLeaf = m.computeLeaf
other.leavesCount = m.leavesCount
if m.limit != nil {
other.limit = new(uint64) // Shallow copy
*other.limit = *m.limit
} else {
other.limit = nil
}

if len(m.layers) > len(other.layers) {
for len(other.layers) != len(m.layers) {
idx := len(other.layers)
other.layers = append(other.layers, make([]byte, len(m.layers[idx]), (len(m.layers[idx])*3)/2))
}
// Ensure `other.layers` has enough capacity (with +50% buffer for future growth)
requiredLayersLen := len(m.layers)
if cap(other.layers) < requiredLayersLen {
other.layers = make([][]byte, requiredLayersLen, requiredLayersLen+(requiredLayersLen/2))
} else {
other.layers = other.layers[:requiredLayersLen]
}

for i := 0; i < len(m.layers); i++ {
// If the destination buffer is too short, extend it
if len(m.layers[i]) > cap(other.layers[i]) {
other.layers[i] = make([]byte, len(m.layers[i]), (len(m.layers[i])*3)/2)
// Copy layers while reusing memory, and allocate with +50% extra space if needed
for i := range m.layers {
requiredLayerLen := len(m.layers[i])
if cap(other.layers[i]) < requiredLayerLen {
other.layers[i] = make([]byte, requiredLayerLen, requiredLayerLen+(requiredLayerLen/2))
} else {
other.layers[i] = other.layers[i][:requiredLayerLen]
}
// Normalizr the destination length
other.layers[i] = other.layers[i][:len(m.layers[i])]

// Now that the 2 slices are of equal length we can do a simple memcopy
copy(other.layers[i], m.layers[i])
}

other.leavesCount = m.leavesCount
other.limit = m.limit
//other.dirtyLeaves = make([]atomic.Bool, len(m.dirtyLeaves))
// Ensure `other.dirtyLeaves` has enough capacity (with +50% buffer for future growth)
requiredLeavesLen := len(m.dirtyLeaves)
if cap(other.dirtyLeaves) < requiredLeavesLen {
other.dirtyLeaves = make([]atomic.Bool, requiredLeavesLen, requiredLeavesLen+(requiredLeavesLen/2))
} else {
other.dirtyLeaves = other.dirtyLeaves[:requiredLeavesLen]
}

for i := 0; i < len(m.dirtyLeaves); i++ {
if i >= len(other.dirtyLeaves) {
other.dirtyLeaves = append(other.dirtyLeaves, atomic.Bool{})
}
// Copy atomic dirty leaves state
for i := range m.dirtyLeaves {
other.dirtyLeaves[i].Store(m.dirtyLeaves[i].Load())
}
}
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (f *forkGraphDisk) useCachedStateIfPossible(blockRoot libcommon.Hash, in *s
}

if prevHeadBlockRoot != blockRoot {
log.Warn("Not Using a cached beacon state", "blockRoot", blockRoot)
log.Debug("Not Using a cached beacon state", "blockRoot", blockRoot)
return nil
}
ok = true
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/beacon_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Loop:
}
// leave a warning if we are stuck for more than 90 seconds
if time.Since(f.highestSlotUpdateTime) > 90*time.Second {
log.Debug("Forward beacon downloader gets stuck", "time", time.Since(f.highestSlotUpdateTime).Seconds(), "highestSlotProcessed", f.highestSlotProcessed)
log.Trace("Forward beacon downloader gets stuck", "time", time.Since(f.highestSlotUpdateTime).Seconds(), "highestSlotProcessed", f.highestSlotProcessed)
}
// this is so we do not get stuck on a side-fork
responses, peerId, err := f.rpc.SendBeaconBlocksByRangeReq(ctx, reqSlot, reqCount)
Expand Down
38 changes: 19 additions & 19 deletions cl/phase1/stages/forward_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stages

import (
"context"
"errors"
"fmt"
"sort"
"sync/atomic"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/erigontech/erigon/cl/persistence/beacon_indicies"
"github.com/erigontech/erigon/cl/persistence/blob_storage"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
network2 "github.com/erigontech/erigon/cl/phase1/network"
)

Expand Down Expand Up @@ -99,7 +101,7 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf
// processDownloadedBlockBatches processes a batch of downloaded blocks.
// It takes the highest block processed, a flag to determine if insertion is needed, and a list of signed beacon blocks as input.
// It returns the new highest block processed and an error if any.
func processDownloadedBlockBatches(ctx context.Context, cfg *Cfg, highestBlockProcessed uint64, shouldInsert bool, blocks []*cltypes.SignedBeaconBlock) (newHighestBlockProcessed uint64, err error) {
func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg *Cfg, highestBlockProcessed uint64, shouldInsert bool, blocks []*cltypes.SignedBeaconBlock) (newHighestBlockProcessed uint64, err error) {
// Pre-process the block batch to ensure that the blocks are sorted by slot in ascending order
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Block.Slot < blocks[j].Block.Slot
Expand All @@ -110,6 +112,13 @@ func processDownloadedBlockBatches(ctx context.Context, cfg *Cfg, highestBlockPr
st *state.CachingBeaconState
)
newHighestBlockProcessed = highestBlockProcessed
if shouldProcessBlobs(blocks, cfg) {
_, err = downloadAndProcessEip4844DA(ctx, logger, cfg, highestBlockProcessed, blocks)
if err != nil {
logger.Trace("[Caplin] Failed to process blobs", "err", err)
return highestBlockProcessed, nil
}
}
// Iterate over each block in the sorted list
for _, block := range blocks {
// Compute the hash of the current block
Expand All @@ -131,7 +140,12 @@ func processDownloadedBlockBatches(ctx context.Context, cfg *Cfg, highestBlockPr
}

// Process the block
if err = processBlock(ctx, cfg, cfg.indiciesDB, block, false, true, false); err != nil {
if err = processBlock(ctx, cfg, cfg.indiciesDB, block, false, true, true); err != nil {
if errors.Is(err, forkchoice.ErrEIP4844DataNotAvailable) {
// Return an error if EIP-4844 data is not available
logger.Trace("[Caplin] forward sync EIP-4844 data not available", "blockSlot", block.Block.Slot)
return highestBlockProcessed, nil
}
// Return an error if block processing fails
err = fmt.Errorf("bad blocks segment received: %w", err)
return
Expand Down Expand Up @@ -193,27 +207,13 @@ func forwardSync(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) er

// Set the function to process downloaded blocks
downloader.SetProcessFunction(func(initialHighestSlotProcessed uint64, blocks []*cltypes.SignedBeaconBlock) (newHighestSlotProcessed uint64, err error) {
highestSlotProcessed, err := processDownloadedBlockBatches(ctx, cfg, initialHighestSlotProcessed, shouldInsert, blocks)
highestSlotProcessed, err := processDownloadedBlockBatches(ctx, logger, cfg, initialHighestSlotProcessed, shouldInsert, blocks)
if err != nil {
logger.Warn("[Caplin] Failed to process block batch", "err", err)
return initialHighestSlotProcessed, err
}
// Exit if we are pre-EIP-4844
if !shouldProcessBlobs(blocks, cfg) {
currentSlot.Store(highestSlotProcessed)
return highestSlotProcessed, nil
}
// Process blobs for EIP-4844
highestBlobSlotProcessed, err := downloadAndProcessEip4844DA(ctx, logger, cfg, initialHighestSlotProcessed, blocks)
if err != nil {
logger.Warn("[Caplin] Failed to process blobs", "err", err)
return initialHighestSlotProcessed, err
}
if highestBlobSlotProcessed <= initialHighestSlotProcessed {
return initialHighestSlotProcessed, nil
}
currentSlot.Store(highestBlobSlotProcessed)
return highestBlobSlotProcessed, nil
currentSlot.Store(highestSlotProcessed)
return highestSlotProcessed, nil
})

// Get the current slot of the chain tip
Expand Down

0 comments on commit dd3cff8

Please sign in to comment.