diff --git a/challenger/child/handler.go b/challenger/child/handler.go index c089aaa..5b19bf7 100644 --- a/challenger/child/handler.go +++ b/challenger/child/handler.go @@ -92,6 +92,10 @@ func (ch *Child) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) } func (ch *Child) txHandler(_ context.Context, args nodetypes.TxHandlerArgs) error { + // ignore failed tx + if !args.Success { + return nil + } txConfig := ch.Node().GetTxConfig() tx, err := txutils.DecodeTx(txConfig, args.Tx) diff --git a/challenger/child/withdraw.go b/challenger/child/withdraw.go index e8af9b8..c82a427 100644 --- a/challenger/child/withdraw.go +++ b/challenger/child/withdraw.go @@ -4,12 +4,9 @@ import ( "context" "encoding/base64" "fmt" - "strconv" "strings" "time" - "cosmossdk.io/math" - opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" challengertypes "github.com/initia-labs/opinit-bots/challenger/types" "github.com/initia-labs/opinit-bots/types" @@ -27,30 +24,6 @@ func (ch *Child) initiateWithdrawalHandler(_ context.Context, args nodetypes.Eve if err != nil { return err } - - for _, attr := range args.EventAttributes { - switch attr.Key { - case opchildtypes.AttributeKeyL2Sequence: - l2Sequence, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - return err - } - case opchildtypes.AttributeKeyFrom: - from = attr.Value - case opchildtypes.AttributeKeyTo: - to = attr.Value - case opchildtypes.AttributeKeyBaseDenom: - baseDenom = attr.Value - case opchildtypes.AttributeKeyAmount: - coinAmount, ok := math.NewIntFromString(attr.Value) - if !ok { - return fmt.Errorf("invalid amount %s", attr.Value) - } - - amount = coinAmount.Uint64() - } - } - return ch.handleInitiateWithdrawal(l2Sequence, from, to, baseDenom, amount) } diff --git a/executor/batch/batch.go b/executor/batch/batch.go index 9a60c2b..d5f4f5e 100644 --- a/executor/batch/batch.go +++ b/executor/batch/batch.go @@ -120,7 +120,7 @@ func (bs *BatchSubmitter) Initialize(ctx context.Context, processedHeight int64, } fileFlag := os.O_CREATE | os.O_RDWR | os.O_APPEND - bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", fileFlag, 0666) + bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", fileFlag, 0640) if err != nil { return err } diff --git a/node/broadcaster/process.go b/node/broadcaster/process.go index 7d38039..2c41346 100644 --- a/node/broadcaster/process.go +++ b/node/broadcaster/process.go @@ -12,6 +12,7 @@ import ( rpccoretypes "github.com/cometbft/cometbft/rpc/core/types" btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" + "github.com/initia-labs/opinit-bots/types" ) func (b Broadcaster) GetHeight() int64 { @@ -110,7 +111,7 @@ func (b *Broadcaster) Start(ctx context.Context) error { return nil case data := <-b.txChannel: var err error - for retry := 1; retry <= 10; retry++ { + for retry := 1; retry <= 7; retry++ { err = b.handleProcessedMsgs(ctx, data) if err == nil { break @@ -122,12 +123,7 @@ func (b *Broadcaster) Start(ctx context.Context) error { break } b.logger.Warn("retry to handle processed msgs after 30 seconds", zap.Int("count", retry), zap.String("error", err.Error())) - timer := time.NewTimer(30 * time.Second) - select { - case <-ctx.Done(): - return nil - case <-timer.C: - } + types.SleepWithRetry(ctx, retry) } if err != nil { return errors.Wrap(err, "failed to handle processed msgs") diff --git a/node/process.go b/node/process.go index db1ce68..6d08d47 100644 --- a/node/process.go +++ b/node/process.go @@ -18,11 +18,14 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo timer := time.NewTicker(types.PollingInterval(ctx)) defer timer.Stop() + consecutiveErrors := 0 for { select { case <-ctx.Done(): return nil case <-timer.C: + types.SleepWithRetry(ctx, consecutiveErrors) + consecutiveErrors++ } status, err := n.rpcClient.Status(ctx) @@ -100,6 +103,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo n.lastProcessedBlockHeight = i } } + consecutiveErrors = 0 } } @@ -153,6 +157,7 @@ func (n *Node) handleNewBlock(ctx context.Context, block *rpccoretypes.ResultBlo LatestHeight: latestChainHeight, TxIndex: int64(txIndex), Tx: tx, + Success: blockResult.TxsResults[txIndex].Code == abcitypes.CodeTypeOK, }) if err != nil { return fmt.Errorf("failed to handle tx: tx_index: %d; %w", txIndex, err) @@ -214,11 +219,14 @@ func (n *Node) txChecker(ctx context.Context) error { timer := time.NewTicker(types.PollingInterval(ctx)) defer timer.Stop() + consecutiveErrors := 0 for { select { case <-ctx.Done(): return nil case <-timer.C: + types.SleepWithRetry(ctx, consecutiveErrors) + consecutiveErrors++ } pendingTx, res, blockTime, err := n.broadcaster.CheckPendingTx(ctx) @@ -250,5 +258,6 @@ func (n *Node) txChecker(ctx context.Context) error { if err != nil { return err } + consecutiveErrors = 0 } } diff --git a/node/types/handler.go b/node/types/handler.go index aee5376..4973b7d 100644 --- a/node/types/handler.go +++ b/node/types/handler.go @@ -25,6 +25,7 @@ type TxHandlerArgs struct { LatestHeight int64 TxIndex int64 Tx comettypes.Tx + Success bool } type TxHandlerFn func(context.Context, TxHandlerArgs) error diff --git a/provider/child/parse.go b/provider/child/parse.go index 0757d62..1c96a1d 100644 --- a/provider/child/parse.go +++ b/provider/child/parse.go @@ -11,12 +11,33 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) +func missingAttrsError(missingAttrs map[string]struct{}) error { + if len(missingAttrs) != 0 { + missingAttrStr := "" + for attr := range missingAttrs { + missingAttrStr += attr + " " + } + return fmt.Errorf("missing attributes: %s", missingAttrStr) + } + return nil +} + func ParseFinalizeDeposit(eventAttrs []abcitypes.EventAttribute) ( l1BlockHeight int64, l1Sequence uint64, from, to, baseDenom string, amount sdk.Coin, err error) { + missingAttrs := map[string]struct{}{ + opchildtypes.AttributeKeyL1Sequence: {}, + opchildtypes.AttributeKeySender: {}, + opchildtypes.AttributeKeyRecipient: {}, + opchildtypes.AttributeKeyDenom: {}, + opchildtypes.AttributeKeyBaseDenom: {}, + opchildtypes.AttributeKeyAmount: {}, + opchildtypes.AttributeKeyFinalizeHeight: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case opchildtypes.AttributeKeyL1Sequence: @@ -44,8 +65,12 @@ func ParseFinalizeDeposit(eventAttrs []abcitypes.EventAttribute) ( if err != nil { return } + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } @@ -53,6 +78,11 @@ func ParseUpdateOracle(eventAttrs []abcitypes.EventAttribute) ( l1BlockHeight int64, from string, err error) { + missingAttrs := map[string]struct{}{ + opchildtypes.AttributeKeyHeight: {}, + opchildtypes.AttributeKeyFrom: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case opchildtypes.AttributeKeyHeight: @@ -62,8 +92,12 @@ func ParseUpdateOracle(eventAttrs []abcitypes.EventAttribute) ( } case opchildtypes.AttributeKeyFrom: from = attr.Value + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } @@ -71,6 +105,14 @@ func ParseInitiateWithdrawal(eventAttrs []abcitypes.EventAttribute) ( l2Sequence, amount uint64, from, to, baseDenom string, err error) { + missingAttrs := map[string]struct{}{ + opchildtypes.AttributeKeyL2Sequence: {}, + opchildtypes.AttributeKeyFrom: {}, + opchildtypes.AttributeKeyTo: {}, + opchildtypes.AttributeKeyBaseDenom: {}, + opchildtypes.AttributeKeyAmount: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case opchildtypes.AttributeKeyL2Sequence: @@ -91,7 +133,11 @@ func ParseInitiateWithdrawal(eventAttrs []abcitypes.EventAttribute) ( return } amount = coinAmount.Uint64() + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } diff --git a/provider/host/parse.go b/provider/host/parse.go index 93ceda8..4415da8 100644 --- a/provider/host/parse.go +++ b/provider/host/parse.go @@ -2,21 +2,41 @@ package host import ( "encoding/hex" + "fmt" "strconv" abcitypes "github.com/cometbft/cometbft/abci/types" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" ) +func missingAttrsError(missingAttrs map[string]struct{}) error { + if len(missingAttrs) != 0 { + missingAttrStr := "" + for attr := range missingAttrs { + missingAttrStr += attr + " " + } + return fmt.Errorf("missing attributes: %s", missingAttrStr) + } + return nil +} + func ParseMsgRecordBatch(eventAttrs []abcitypes.EventAttribute) ( submitter string, err error, ) { + missingAttrs := map[string]struct{}{ + ophosttypes.AttributeKeySubmitter: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case ophosttypes.AttributeKeySubmitter: submitter = attr.Value + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } @@ -25,6 +45,14 @@ func ParseMsgUpdateBatchInfo(eventAttrs []abcitypes.EventAttribute) ( outputIndex uint64, l2BlockNumber int64, err error) { + missingAttrs := map[string]struct{}{ + ophosttypes.AttributeKeyBridgeId: {}, + ophosttypes.AttributeKeyBatchChainType: {}, + ophosttypes.AttributeKeyBatchSubmitter: {}, + ophosttypes.AttributeKeyFinalizedOutputIndex: {}, + ophosttypes.AttributeKeyFinalizedL2BlockNumber: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case ophosttypes.AttributeKeyBridgeId: @@ -46,8 +74,12 @@ func ParseMsgUpdateBatchInfo(eventAttrs []abcitypes.EventAttribute) ( if err != nil { return } + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } @@ -55,6 +87,16 @@ func ParseMsgInitiateDeposit(eventAttrs []abcitypes.EventAttribute) ( bridgeId, l1Sequence uint64, from, to, l1Denom, l2Denom, amount string, data []byte, err error) { + missingAttrs := map[string]struct{}{ + ophosttypes.AttributeKeyBridgeId: {}, + ophosttypes.AttributeKeyL1Sequence: {}, + ophosttypes.AttributeKeyFrom: {}, + ophosttypes.AttributeKeyTo: {}, + ophosttypes.AttributeKeyL1Denom: {}, + ophosttypes.AttributeKeyL2Denom: {}, + ophosttypes.AttributeKeyAmount: {}, + ophosttypes.AttributeKeyData: {}, + } for _, attr := range eventAttrs { switch attr.Key { @@ -83,8 +125,12 @@ func ParseMsgInitiateDeposit(eventAttrs []abcitypes.EventAttribute) ( if err != nil { return } + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } @@ -95,6 +141,14 @@ func ParseMsgProposeOutput(eventAttrs []abcitypes.EventAttribute) ( proposer string, outputRoot []byte, err error) { + missingAttrs := map[string]struct{}{ + ophosttypes.AttributeKeyProposer: {}, + ophosttypes.AttributeKeyBridgeId: {}, + ophosttypes.AttributeKeyOutputIndex: {}, + ophosttypes.AttributeKeyL2BlockNumber: {}, + ophosttypes.AttributeKeyOutputRoot: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case ophosttypes.AttributeKeyProposer: @@ -119,8 +173,12 @@ func ParseMsgProposeOutput(eventAttrs []abcitypes.EventAttribute) ( if err != nil { return } + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } @@ -128,6 +186,17 @@ func ParseMsgFinalizeWithdrawal(eventAttrs []abcitypes.EventAttribute) ( bridgeId, outputIndex, l2Sequence uint64, from, to, l1Denom, l2Denom, amount string, err error) { + missingAttrs := map[string]struct{}{ + ophosttypes.AttributeKeyBridgeId: {}, + ophosttypes.AttributeKeyOutputIndex: {}, + ophosttypes.AttributeKeyL2Sequence: {}, + ophosttypes.AttributeKeyFrom: {}, + ophosttypes.AttributeKeyTo: {}, + ophosttypes.AttributeKeyL1Denom: {}, + ophosttypes.AttributeKeyL2Denom: {}, + ophosttypes.AttributeKeyAmount: {}, + } + for _, attr := range eventAttrs { switch attr.Key { case ophosttypes.AttributeKeyBridgeId: @@ -155,7 +224,11 @@ func ParseMsgFinalizeWithdrawal(eventAttrs []abcitypes.EventAttribute) ( l2Denom = attr.Value case ophosttypes.AttributeKeyAmount: amount = attr.Value + default: + continue } + delete(missingAttrs, attr.Key) } + err = missingAttrsError(missingAttrs) return } diff --git a/types/retry.go b/types/retry.go new file mode 100644 index 0000000..f874ec2 --- /dev/null +++ b/types/retry.go @@ -0,0 +1,29 @@ +package types + +import ( + "context" + "math" + "math/rand/v2" + "time" +) + +const MaxRetryCount = 7 + +func SleepWithRetry(ctx context.Context, retry int) { + // to avoid to sleep too long + if retry > MaxRetryCount { + retry = MaxRetryCount + } else if retry == 0 { + return + } + + sleepTime := 2 * math.Exp2(float64(retry)) + sleepTime += rand.Float64() * float64(sleepTime) * 0.5 //nolint:all + timer := time.NewTimer(time.Duration(sleepTime) * time.Second) + defer timer.Stop() + select { + case <-ctx.Done(): + return + case <-timer.C: + } +}