Skip to content

Commit

Permalink
ReceiptDomain: interity check (#13650)
Browse files Browse the repository at this point in the history
fixing #13367 (will take
couple days to gen and release new files)

---------

Co-authored-by: JkLondon <[email protected]>
  • Loading branch information
AskAlexSharov and JkLondon authored Feb 1, 2025
1 parent 55dcac0 commit a317091
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 36 deletions.
14 changes: 7 additions & 7 deletions cmd/state/exec3/historical_trace_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ func doHistoryReduce(consumer TraceConsumer, db kv.TemporalRoDB, ctx context.Con
}
defer tx.Rollback()

var rwsClosed bool
for outputTxNum.Load() <= toTxNum && !rwsClosed {
rwsClosed, err = rws.DrainNonBlocking(ctx)
for outputTxNum.Load() <= toTxNum {
err = rws.DrainNonBlocking(ctx)
if err != nil {
return err
}
Expand All @@ -296,6 +295,9 @@ func doHistoryReduce(consumer TraceConsumer, db kv.TemporalRoDB, ctx context.Con
outputTxNum.Store(processedTxNum)
}
}
//if outputTxNum.Load() != toTxNum {
// return fmt.Errorf("not all txnums proceeded: toTxNum=%d, outputTxNum=%d", toTxNum, outputTxNum.Load())
//}
return nil
}
func doHistoryMap(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, in *state.QueueWithRetry, workerCount int, rws *state.ResultsQueue, logger log.Logger) error {
Expand Down Expand Up @@ -333,9 +335,7 @@ func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueu
return outputTxNum, false, txTask.Error
}

if txTask.TxIndex >= 0 && !txTask.Final {
txTask.CreateReceipt(tx)
}
txTask.CreateReceipt(tx)
if err := consumer.Reduce(txTask, tx); err != nil {
return outputTxNum, false, err
}
Expand All @@ -348,7 +348,6 @@ func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueu
}

func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx context.Context, tx kv.TemporalTx, cfg *ExecArgs, logger log.Logger) (err error) {
log.Info("[Receipt] batch start", "fromBlock", fromBlock, "toBlock", toBlock, "workers", cfg.Workers)
br := cfg.BlockReader
chainConfig := cfg.ChainConfig
if chainConfig.ChainName == networkname.Gnosis {
Expand Down Expand Up @@ -379,6 +378,7 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx
WorkerCount = cfg.Workers
}

log.Info("[Receipt] batch start", "fromBlock", fromBlock, "toBlock", toBlock, "workers", cfg.Workers, "toTxNum", toTxNum)
getHeaderFunc := func(hash common.Hash, number uint64) (h *types.Header) {
if tx != nil && WorkerCount == 1 {
h, _ = cfg.BlockReader.Header(ctx, tx, hash, number)
Expand Down
17 changes: 10 additions & 7 deletions core/state/txtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (q *ResultsQueue) Add(ctx context.Context, task *TxTask) error {
}
return nil
}
func (q *ResultsQueue) drainNoBlock(ctx context.Context, task *TxTask) (closed bool, err error) {
func (q *ResultsQueue) drainNoBlock(ctx context.Context, task *TxTask) (err error) {
q.m.Lock()
defer q.m.Unlock()
if task != nil {
Expand All @@ -385,20 +385,21 @@ func (q *ResultsQueue) drainNoBlock(ctx context.Context, task *TxTask) (closed b
for {
select {
case <-ctx.Done():
return q.closed, ctx.Err()
return ctx.Err()
case txTask, ok := <-q.resultCh:
if !ok {
return q.closed, nil
//log.Warn("[dbg] closed1")
return nil
}
if txTask == nil {
continue
}
heap.Push(q.results, txTask)
if q.results.Len() > q.limit {
return q.closed, nil
return nil
}
default: // we are inside mutex section, can't block here
return q.closed, nil
return nil
}
}
}
Expand Down Expand Up @@ -431,7 +432,7 @@ func (q *ResultsQueue) Drain(ctx context.Context) error {
if !ok {
return nil
}
if _, err := q.drainNoBlock(ctx, txTask); err != nil {
if err := q.drainNoBlock(ctx, txTask); err != nil {
return err
}
case <-q.ticker.C:
Expand All @@ -448,7 +449,7 @@ func (q *ResultsQueue) Drain(ctx context.Context) error {
}

// DrainNonBlocking - does drain batch of results to heap. Immediately stops at `q.limit` or if nothing to drain
func (q *ResultsQueue) DrainNonBlocking(ctx context.Context) (closed bool, err error) {
func (q *ResultsQueue) DrainNonBlocking(ctx context.Context) (err error) {
return q.drainNoBlock(ctx, nil)
}

Expand Down Expand Up @@ -477,6 +478,8 @@ Loop:
}

func (q *ResultsQueue) Close() {
q.m.Lock()
defer q.m.Unlock()
if q.closed {
return
}
Expand Down
1 change: 0 additions & 1 deletion erigon-lib/kv/kv_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ type TemporalPutDel interface {
// Optimizations:
// - user can prvide `prevVal != nil` - then it will not read prev value from storage
// - user can append k2 into k1, then underlying methods will not preform append
// - if `val == nil` it will call DomainDel
DomainPut(domain Domain, k1, k2 []byte, val, prevVal []byte, prevStep uint64) error

// DomainDel
Expand Down
22 changes: 19 additions & 3 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ var sortableBuffersPoolForPruning = sync.Pool{
},
}

func sortableBufferForPruning() etl.Buffer {
sortableBuffer := sortableBuffersPoolForPruning.Get().(etl.Buffer)
sortableBuffer.Reset()
return sortableBuffer
}

var (
asserts = dbg.EnvBool("AGG_ASSERTS", false)
traceFileLife = dbg.EnvString("AGG_TRACE_FILE_LIFE", "")
Expand Down Expand Up @@ -1531,10 +1537,19 @@ func (dt *DomainRoTx) GetAsOf(key []byte, txNum uint64, roTx kv.Tx) ([]byte, boo
}
return v, v != nil, nil
}
v, _, _, err = dt.GetLatest(key, roTx)

var ok bool
v, _, ok, err = dt.GetLatest(key, roTx)
if err != nil {
return nil, false, err
}
if traceGetAsOf == dt.d.filenameBase {
if ok {
fmt.Printf("DomainGetAsOf(%s, %x, %d) -> found in latest state\n", dt.d.filenameBase, key, txNum)
} else {
fmt.Printf("DomainGetAsOf(%s, %x, %d) -> not found in latest state\n", dt.d.filenameBase, key, txNum)
}
}
return v, v != nil, nil
}

Expand Down Expand Up @@ -1874,8 +1889,7 @@ func (dt *DomainRoTx) Prune(ctx context.Context, rwTx kv.RwTx, step, txFrom, txT

var valsCursor kv.RwCursor

sortableBuffer := sortableBuffersPoolForPruning.Get().(etl.Buffer)
sortableBuffer.Reset()
sortableBuffer := sortableBufferForPruning()
defer sortableBuffersPoolForPruning.Put(sortableBuffer)

ancientDomainValsCollector := etl.NewCollector(dt.name.String()+".domain.collate", dt.d.dirs.Tmp, sortableBuffer, dt.d.logger).LogLvl(log.LvlTrace)
Expand Down Expand Up @@ -2021,3 +2035,5 @@ func (dt *DomainRoTx) Files() (res []string) {
return append(res, dt.ht.Files()...)
}
func (dt *DomainRoTx) Name() kv.Domain { return dt.name }

func (dt *DomainRoTx) DbgMaxTxNumInDB(tx kv.Tx) uint64 { return dt.ht.iit.ii.maxTxNumInDB(tx) }
3 changes: 1 addition & 2 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,7 @@ func (iit *InvertedIndexRoTx) Prune(ctx context.Context, rwTx kv.RwTx, txFrom, t
}
defer idxDelCursor.Close()

sortableBuffer := sortableBuffersPoolForPruning.Get().(etl.Buffer)
sortableBuffer.Reset()
sortableBuffer := sortableBufferForPruning()
defer sortableBuffersPoolForPruning.Put(sortableBuffer)

collector := etl.NewCollector(ii.filenameBase+".prune.ii", ii.dirs.Tmp, sortableBuffer, ii.logger)
Expand Down
3 changes: 2 additions & 1 deletion eth/integrity/integrity_action_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ const (
InvertedIndex Check = "InvertedIndex"
HistoryNoSystemTxs Check = "HistoryNoSystemTxs"
NoBorEventGaps Check = "NoBorEventGaps"
ReceiptsNoDups Check = "ReceiptsNoDups"
)

var AllChecks = []Check{
Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, NoBorEventGaps,
Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, NoBorEventGaps, ReceiptsNoDups,
}
103 changes: 103 additions & 0 deletions eth/integrity/receipts_no_duplicates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package integrity

import (
"context"
"fmt"
"time"

"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/rawdbv3"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/state"
"github.com/erigontech/erigon/core/rawdb/rawtemporaldb"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"github.com/erigontech/erigon/turbo/services"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
)

func ReceiptsNoDuplicates(ctx context.Context, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool) (err error) {
defer func() {
log.Info("[integrity] ReceiptsNoDuplicates: done", "err", err)
}()

logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()

tx, err := db.BeginTemporalRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()

fromBlock := uint64(1)
stageExecProgress, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return err
}
toBlock := stageExecProgress

txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))
fromTxNum, err := txNumsReader.Min(tx, fromBlock)
if err != nil {
return err
}
if toBlock > 0 {
toBlock-- // [fromBlock,toBlock)
}

ac := tx.(state.HasAggTx).AggTx().(*state.AggregatorRoTx)
//toTxNum := ac.DbgDomain(kv.ReceiptDomain).DbgMaxTxNumInDB(tx)
toTxNum, err := txNumsReader.Max(tx, toBlock)
if err != nil {
return err
}
prevCumGasUsed := -1
prevBN := uint64(1)
log.Info("[integrity] ReceiptsNoDuplicates starting", "fromTxNum", fromTxNum, "toTxNum", toTxNum)

{
receiptProgress := ac.DbgDomain(kv.ReceiptDomain).DbgMaxTxNumInDB(tx)
accProgress := ac.DbgDomain(kv.AccountsDomain).DbgMaxTxNumInDB(tx)
if accProgress != receiptProgress {
err := fmt.Errorf("[integrity] ReceiptDomain=%d is behind AccountDomain=%d", receiptProgress, accProgress)
log.Warn(err.Error())
}
}

var cumGasUsed uint64
for txNum := fromTxNum; txNum <= toTxNum; txNum++ {
cumGasUsed, _, _, err = rawtemporaldb.ReceiptAsOf(tx, txNum)
if err != nil {
return err
}
//_, blockNum, _ := txNumsReader.FindBlockNum(tx, txNum)
blockNum := badFoundBlockNum(tx, prevBN-1, txNumsReader, txNum)
//fmt.Printf("[dbg] cumGasUsed=%d, txNum=%d, blockNum=%d, prevCumGasUsed=%d\n", cumGasUsed, txNum, blockNum, prevCumGasUsed)
if int(cumGasUsed) == prevCumGasUsed && cumGasUsed != 0 && blockNum == prevBN {
err := fmt.Errorf("bad receipt at txnum: %d, block: %d, cumGasUsed=%d, prevCumGasUsed=%d", txNum, blockNum, cumGasUsed, prevCumGasUsed)
panic(err)
}
prevCumGasUsed = int(cumGasUsed)
prevBN = blockNum

select {
case <-ctx.Done():
return
case <-logEvery.C:
log.Info("[integrity] ReceiptsNoDuplicates", "progress", fmt.Sprintf("%dk/%dk", blockNum/1_000, toBlock/1_000))
default:
}
}

return nil
}

func badFoundBlockNum(tx kv.Tx, fromBlock uint64, txNumsReader rawdbv3.TxNumsReader, curTxNum uint64) uint64 {
txNumMax, _ := txNumsReader.Max(tx, fromBlock)
i := uint64(0)
for txNumMax < curTxNum {
i++
txNumMax, _ = txNumsReader.Max(tx, fromBlock+i)
}
return fromBlock + i
}
2 changes: 1 addition & 1 deletion eth/stagedsync/exec3_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (se *serialExecutor) execute(ctx context.Context, tasks []*state.TxTask) (c

if !txTask.Final {
var receipt *types.Receipt
if txTask.TxIndex >= 0 && !txTask.Final {
if txTask.TxIndex >= 0 {
receipt = txTask.BlockReceipts[txTask.TxIndex]
}
if err := rawtemporaldb.AppendReceipt(se.doms, receipt, se.blobGasUsed); err != nil {
Expand Down
Loading

0 comments on commit a317091

Please sign in to comment.