From 34d27a211665e964ec485fed8a4be22b6f350821 Mon Sep 17 00:00:00 2001 From: sh-cha Date: Fri, 7 Feb 2025 16:59:04 +0900 Subject: [PATCH] ignore redundant tx & check pending tx even if it timed out --- node/broadcaster/broadcaster.go | 75 +++++++++++++++------------------ node/broadcaster/tx.go | 2 + 2 files changed, 37 insertions(+), 40 deletions(-) diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index 7ca30a4..9baa437 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -3,6 +3,7 @@ package broadcaster import ( "encoding/hex" "fmt" + "regexp" "slices" "sync" "time" @@ -22,6 +23,8 @@ import ( "github.com/initia-labs/opinit-bots/types" ) +var txNotFoundRegex = regexp.MustCompile("Internal error: tx ([A-Fa-f0-9]+) not found") + type Broadcaster struct { cfg btypes.BroadcasterConfig @@ -147,53 +150,45 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las return nil } - pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC() - - // if we have pending txs, wait until timeout - if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) { - waitingTime := timeoutTime.Sub(lastBlockTime) - timer := time.NewTimer(waitingTime) - defer timer.Stop() - - ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime)) + pollingTimer := time.NewTicker(ctx.PollingInterval()) + defer pollingTimer.Stop() - pollingTimer := time.NewTicker(ctx.PollingInterval()) - defer pollingTimer.Stop() + for { + if len(pendingTxs) == 0 { + return nil + } - WAITLOOP: - for { - if len(pendingTxs) == 0 { - return nil - } + select { + case <-ctx.Done(): + return ctx.Err() + case <-pollingTimer.C: + } - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: - break WAITLOOP - case <-pollingTimer.C: - } + txHash, err := hex.DecodeString(pendingTxs[0].TxHash) + if err != nil { + return err + } - txHash, err := hex.DecodeString(pendingTxs[0].TxHash) + res, err := b.rpcClient.QueryTx(ctx, txHash) + if err == nil && res != nil && res.TxResult.Code == 0 { + ctx.Logger().Debug("transaction successfully included", + zap.String("hash", pendingTxs[0].TxHash), + zap.Int64("height", res.Height)) + err = DeletePendingTx(b.db, pendingTxs[0]) if err != nil { return err } - - res, err := b.rpcClient.QueryTx(ctx, txHash) - if err == nil && res != nil && res.TxResult.Code == 0 { - ctx.Logger().Debug("transaction successfully included", - zap.String("hash", pendingTxs[0].TxHash), - zap.Int64("height", res.Height)) - err = DeletePendingTx(b.db, pendingTxs[0]) - if err != nil { - return err - } - pendingTxs = pendingTxs[1:] - } else if err == nil && res != nil { - ctx.Logger().Warn("transaction failed", - zap.String("hash", pendingTxs[0].TxHash), - zap.Uint32("code", res.TxResult.Code), - zap.String("log", res.TxResult.Log)) + pendingTxs = pendingTxs[1:] + } else if err == nil && res != nil { + ctx.Logger().Warn("transaction failed", + zap.String("hash", pendingTxs[0].TxHash), + zap.Uint32("code", res.TxResult.Code), + zap.String("log", res.TxResult.Log)) + } else if err != nil && txNotFoundRegex.FindStringSubmatch(err.Error()) != nil { + pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC() + timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout) + if lastBlockTime.After(timeoutTime) { + break } } } diff --git a/node/broadcaster/tx.go b/node/broadcaster/tx.go index d4b755e..a382797 100644 --- a/node/broadcaster/tx.go +++ b/node/broadcaster/tx.go @@ -21,6 +21,8 @@ var ignoringErrors = []error{ opchildtypes.ErrOracleValidatorsNotRegistered, opchildtypes.ErrInvalidOracleHeight, opchildtypes.ErrInvalidOracleTimestamp, + + opchildtypes.ErrRedundantTx, } var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)") var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index")