Skip to content

Commit

Permalink
ignore redundant tx & check pending tx even if it timed out
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-cha committed Feb 7, 2025
1 parent 0c61dcd commit 34d27a2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 40 deletions.
75 changes: 35 additions & 40 deletions node/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package broadcaster
import (
"encoding/hex"
"fmt"
"regexp"
"slices"
"sync"
"time"
Expand All @@ -22,6 +23,8 @@ import (
"github.com/initia-labs/opinit-bots/types"
)

var txNotFoundRegex = regexp.MustCompile("Internal error: tx ([A-Fa-f0-9]+) not found")

type Broadcaster struct {
cfg btypes.BroadcasterConfig

Expand Down Expand Up @@ -147,53 +150,45 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
return nil
}

pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC()

// if we have pending txs, wait until timeout
if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) {
waitingTime := timeoutTime.Sub(lastBlockTime)
timer := time.NewTimer(waitingTime)
defer timer.Stop()

ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime))
pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()

pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()
for {
if len(pendingTxs) == 0 {
return nil
}

WAITLOOP:
for {
if len(pendingTxs) == 0 {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-pollingTimer.C:
}

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
break WAITLOOP
case <-pollingTimer.C:
}
txHash, err := hex.DecodeString(pendingTxs[0].TxHash)
if err != nil {
return err
}

txHash, err := hex.DecodeString(pendingTxs[0].TxHash)
res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil && res.TxResult.Code == 0 {
ctx.Logger().Debug("transaction successfully included",
zap.String("hash", pendingTxs[0].TxHash),
zap.Int64("height", res.Height))
err = DeletePendingTx(b.db, pendingTxs[0])
if err != nil {
return err
}

res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil && res.TxResult.Code == 0 {
ctx.Logger().Debug("transaction successfully included",
zap.String("hash", pendingTxs[0].TxHash),
zap.Int64("height", res.Height))
err = DeletePendingTx(b.db, pendingTxs[0])
if err != nil {
return err
}
pendingTxs = pendingTxs[1:]
} else if err == nil && res != nil {
ctx.Logger().Warn("transaction failed",
zap.String("hash", pendingTxs[0].TxHash),
zap.Uint32("code", res.TxResult.Code),
zap.String("log", res.TxResult.Log))
pendingTxs = pendingTxs[1:]
} else if err == nil && res != nil {
ctx.Logger().Warn("transaction failed",
zap.String("hash", pendingTxs[0].TxHash),
zap.Uint32("code", res.TxResult.Code),
zap.String("log", res.TxResult.Log))
} else if err != nil && txNotFoundRegex.FindStringSubmatch(err.Error()) != nil {
pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC()
timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout)
if lastBlockTime.After(timeoutTime) {
break
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/broadcaster/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var ignoringErrors = []error{
opchildtypes.ErrOracleValidatorsNotRegistered,
opchildtypes.ErrInvalidOracleHeight,
opchildtypes.ErrInvalidOracleTimestamp,

opchildtypes.ErrRedundantTx,
}
var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index")
Expand Down

0 comments on commit 34d27a2

Please sign in to comment.