Skip to content

Commit

Permalink
miner: reset ctx timeout before commit tx on new tx notif (#1434)
Browse files Browse the repository at this point in the history
* miner: reset ctx before commit transactions in main loop

* miner: log error in tx interrupt

* miner: typo

* worker: handle commit interrupt for new tx processing
  • Loading branch information
manav2401 authored Feb 10, 2025
1 parent d274e62 commit 466ff66
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,11 @@ func (w *worker) mainLoop() {
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue
}
// If we don't have time to execute (i.e. we're past header timestamp), abort
delay := time.Until(time.Unix(int64(w.current.header.Time), 0))
if delay <= 0 {
continue
}
txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs))
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
Expand All @@ -634,7 +639,14 @@ func (w *worker) mainLoop() {

tcount := w.current.tcount

w.interruptCtx = resetAndCopyInterruptCtx(w.interruptCtx)
stopFn := func() {}
if w.interruptCommitFlag {
w.interruptCtx, stopFn = getInterruptTimer(w.interruptCtx, w.current.header.Number.Uint64(), w.current.header.Time)
w.interruptCtx = vm.PutCache(w.interruptCtx, w.interruptedTxCache)
}
w.commitTransactions(w.current, plainTxs, blobTxs, nil, new(uint256.Int))
stopFn()

// Only update the snapshot if any new transactons were added
// to the pending block
Expand Down Expand Up @@ -939,7 +951,7 @@ mainloop:
select {
case <-w.interruptCtx.Done():
txCommitInterruptCounter.Inc(1)
log.Warn("Tx Level Interrupt", "hash", lastTxHash)
log.Warn("Tx Level Interrupt", "hash", lastTxHash, "err", w.interruptCtx.Err())
break mainloop
default:
}
Expand Down Expand Up @@ -1046,7 +1058,7 @@ mainloop:

logs, err := w.commitTransaction(env, tx)

// Check if we have a `delay` set in interrup context. It's only set during tests.
// Check if we have a `delay` set in interrupt context. It's only set during tests.
if w.interruptCtx != nil {
if delay := w.interruptCtx.Value(vm.InterruptCtxDelayKey); delay != nil {
// nolint : durationcheck
Expand Down Expand Up @@ -1428,8 +1440,7 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int
}()

if !noempty && w.interruptCommitFlag {
block := w.chain.GetBlockByHash(w.chain.CurrentBlock().Hash())
w.interruptCtx, stopFn = getInterruptTimer(w.interruptCtx, work, block)
w.interruptCtx, stopFn = getInterruptTimer(w.interruptCtx, work.header.Number.Uint64(), work.header.Time)
w.interruptCtx = vm.PutCache(w.interruptCtx, w.interruptedTxCache)
}

Expand Down Expand Up @@ -1497,16 +1508,14 @@ func resetAndCopyInterruptCtx(interruptCtx context.Context) context.Context {
return newCtx
}

func getInterruptTimer(interruptCtx context.Context, work *environment, current *types.Block) (context.Context, func()) {
delay := time.Until(time.Unix(int64(work.header.Time), 0))

func getInterruptTimer(interruptCtx context.Context, number, timestamp uint64) (context.Context, func()) {
delay := time.Until(time.Unix(int64(timestamp), 0))
interruptCtx, cancel := context.WithTimeout(interruptCtx, delay)
blockNumber := current.NumberU64() + 1

go func() {
<-interruptCtx.Done()
if interruptCtx.Err() != context.Canceled {
log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber)
log.Info("Commit Interrupt. Pre-committing the current block", "block", number)
cancel()
}
}()
Expand Down

0 comments on commit 466ff66

Please sign in to comment.