From 67daf8e6eddb815272c363fc0fa0426cfaf1c47c Mon Sep 17 00:00:00 2001 From: beer-1 <147697694+beer-1@users.noreply.github.com> Date: Tue, 6 Aug 2024 12:14:49 +0900 Subject: [PATCH] rename functions and fix lint --- go.mod | 5 ----- go.sum | 4 ++++ node/broadcaster/account.go | 3 +-- node/broadcaster/process.go | 37 +++++++++++++++++++------------------ node/broadcaster/tx.go | 34 ++-------------------------------- node/process.go | 6 ++++-- 6 files changed, 30 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index d7bfffa..28c6c92 100644 --- a/go.mod +++ b/go.mod @@ -182,8 +182,3 @@ require ( ) replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 - -replace ( - github.com/initia-labs/OPinit => ../opinit - github.com/initia-labs/OPinit/api => ../opinit/api -) diff --git a/go.sum b/go.sum index b862cf9..3ae95f9 100644 --- a/go.sum +++ b/go.sum @@ -476,6 +476,10 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/initia-labs/OPinit v0.4.1 h1:g6IVEAOe2X31pgjk/q0zg4R1GfNj2QP3q5s3HMcWm8w= +github.com/initia-labs/OPinit v0.4.1/go.mod h1:n0eqwOnVGE1vuTnW+3jzyEXfE4ndTM0vCRGmPu9VvUc= +github.com/initia-labs/OPinit/api v0.4.1 h1:Q8etW92LiwekKZxzDYVFdiHF3uOpEA4nyajy8zpcxB0= +github.com/initia-labs/OPinit/api v0.4.1/go.mod h1:Xy/Nt3ubXLQ4zKn0m7RuQOM1sj8TVdlNNyek21TGYR0= github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls= github.com/jhump/protoreflect v1.15.3/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= diff --git a/node/broadcaster/account.go b/node/broadcaster/account.go index d9097d3..231d9d2 100644 --- a/node/broadcaster/account.go +++ b/node/broadcaster/account.go @@ -68,8 +68,7 @@ func (b *Broadcaster) GetAccountWithHeight(_ client.Context, addr sdk.AccAddress return nil, 0, fmt.Errorf("failed to parse block height: %w", err) } - //nolint:staticcheck - var acc authtypes.AccountI + var acc sdk.AccountI if err := b.cdc.UnpackAny(res.Account, &acc); err != nil { return nil, 0, err } diff --git a/node/broadcaster/process.go b/node/broadcaster/process.go index bccc689..5a9aed6 100644 --- a/node/broadcaster/process.go +++ b/node/broadcaster/process.go @@ -18,7 +18,8 @@ func (b Broadcaster) GetHeight() uint64 { return b.lastProcessedBlockHeight + 1 } -func (b *Broadcaster) HandleBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error { +// HandleNewBlock is called when a new block is received. +func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error { // check pending txs first for _, tx := range block.Block.Txs { @@ -28,7 +29,7 @@ func (b *Broadcaster) HandleBlock(block *rpccoretypes.ResultBlock, blockResult * // check if the first pending tx is included in the block if pendingTx := b.peekLocalPendingTx(); btypes.TxHash(tx) == pendingTx.TxHash { - err := b.MarkPendingTxAsProcessed(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence) + err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence) if err != nil { return err } @@ -51,19 +52,7 @@ func (b *Broadcaster) HandleBlock(block *rpccoretypes.ResultBlock, blockResult * return nil } -func (b *Broadcaster) MarkPendingTxAsProcessed(blockHeight int64, txHash string, sequence uint64) error { - err := b.deletePendingTx(sequence) - if err != nil { - return err - } - - b.logger.Debug("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("txHash", txHash)) - b.dequeueLocalPendingTx() - - return nil -} - -// CheckPendingTx checks if the pending tx is included in the block +// CheckPendingTx query tx info to check if pending tx is processed. func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, error) { if b.lenLocalPendingTx() == 0 { return nil, nil, nil @@ -89,6 +78,20 @@ func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.Res return &pendingTx, res, nil } +// RemovePendingTx remove pending tx from local pending txs. +// It is called when the pending tx is included in the block. +func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64) error { + err := b.deletePendingTx(sequence) + if err != nil { + return err + } + + b.logger.Debug("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("txHash", txHash)) + b.dequeueLocalPendingTx() + + return nil +} + // Start broadcaster loop func (b *Broadcaster) Start(ctx context.Context) error { defer close(b.txChannelStopped) @@ -123,10 +126,8 @@ func (b *Broadcaster) Start(ctx context.Context) error { } // @dev: these pending processed data is filled at initialization(`NewBroadcaster`). -func (b Broadcaster) BroadcastPendingProcessedMsgs() error { +func (b Broadcaster) BroadcastPendingProcessedMsgs() { for _, processedMsg := range b.pendingProcessedMsgs { b.BroadcastMsgs(processedMsg) } - - return nil } diff --git a/node/broadcaster/tx.go b/node/broadcaster/tx.go index afa7a0d..b891fb2 100644 --- a/node/broadcaster/tx.go +++ b/node/broadcaster/tx.go @@ -7,9 +7,7 @@ import ( "regexp" "strconv" "strings" - "time" - "github.com/pkg/errors" "go.uber.org/zap" sdkerrors "cosmossdk.io/errors" @@ -37,36 +35,6 @@ var ignoringErrors = []error{ var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)") var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index") -func (b *Broadcaster) txBroadcastLooper(ctx context.Context) error { - retry := time.NewTicker(30 * time.Second) - - for { - select { - case <-ctx.Done(): - return nil - case data := <-b.txChannel: - count := 1 - select { - case <-ctx.Done(): - return nil - case <-retry.C: - err := b.handleProcessedMsgs(ctx, data) - if err == nil { - break - } else if err = b.handleMsgError(err); err == nil { - break - } - b.logger.Warn("retry", zap.Int("count", count), zap.String("error", err.Error())) - count++ - - if count >= 10 { - return errors.Wrap(err, "failed to handle processed msgs") - } - } - } - } -} - func (b *Broadcaster) handleMsgError(err error) error { if strs := accountSeqRegex.FindStringSubmatch(err.Error()); strs != nil { expected, parseErr := strconv.ParseUint(strs[1], 10, 64) @@ -116,6 +84,8 @@ func (b *Broadcaster) handleMsgError(err error) error { return err } +// HandleProcessedMsgs handles processed messages by broadcasting them to the network. +// It stores the transaction in the database and local memory and keep track of the successful broadcast. func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.ProcessedMsgs) error { sequence := b.txf.Sequence() txBytes, txHash, err := b.cfg.BuildTxWithMessages(ctx, data.Msgs) diff --git a/node/process.go b/node/process.go index bfa3a18..b09eedb 100644 --- a/node/process.go +++ b/node/process.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" ) +// blockProcessLooper fetches new blocks and processes them func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.BlockProcessType) error { timer := time.NewTicker(nodetypes.POLLING_INTERVAL) defer timer.Stop() @@ -92,6 +93,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo } } +// fetch new block from the chain func (n *Node) fetchNewBlock(ctx context.Context, height int64) (block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, err error) { n.logger.Debug("fetch new block", zap.Int64("height", height)) block, err = n.rpcClient.Block(ctx, &height) @@ -116,7 +118,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc // handle broadcaster first to check pending txs if n.broadcaster != nil { - err := n.broadcaster.HandleBlock(block, blockResult, latestChainHeight) + err := n.broadcaster.HandleNewBlock(block, blockResult, latestChainHeight) if err != nil { return err } @@ -230,7 +232,7 @@ func (n *Node) txChecker(ctx context.Context) error { } } - err = n.broadcaster.MarkPendingTxAsProcessed(int64(res.Height), pendingTx.TxHash, pendingTx.Sequence) + err = n.broadcaster.RemovePendingTx(int64(res.Height), pendingTx.TxHash, pendingTx.Sequence) if err != nil { return err }