Skip to content

Commit

Permalink
rename functions and fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 committed Aug 6, 2024
1 parent 4698cbb commit 67daf8e
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 59 deletions.
5 changes: 0 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,3 @@ require (
)

replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

replace (
github.com/initia-labs/OPinit => ../opinit
github.com/initia-labs/OPinit/api => ../opinit/api
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/initia-labs/OPinit v0.4.1 h1:g6IVEAOe2X31pgjk/q0zg4R1GfNj2QP3q5s3HMcWm8w=
github.com/initia-labs/OPinit v0.4.1/go.mod h1:n0eqwOnVGE1vuTnW+3jzyEXfE4ndTM0vCRGmPu9VvUc=
github.com/initia-labs/OPinit/api v0.4.1 h1:Q8etW92LiwekKZxzDYVFdiHF3uOpEA4nyajy8zpcxB0=
github.com/initia-labs/OPinit/api v0.4.1/go.mod h1:Xy/Nt3ubXLQ4zKn0m7RuQOM1sj8TVdlNNyek21TGYR0=
github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls=
github.com/jhump/protoreflect v1.15.3/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down
3 changes: 1 addition & 2 deletions node/broadcaster/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func (b *Broadcaster) GetAccountWithHeight(_ client.Context, addr sdk.AccAddress
return nil, 0, fmt.Errorf("failed to parse block height: %w", err)
}

//nolint:staticcheck
var acc authtypes.AccountI
var acc sdk.AccountI
if err := b.cdc.UnpackAny(res.Account, &acc); err != nil {
return nil, 0, err
}
Expand Down
37 changes: 19 additions & 18 deletions node/broadcaster/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func (b Broadcaster) GetHeight() uint64 {
return b.lastProcessedBlockHeight + 1
}

func (b *Broadcaster) HandleBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error {
// HandleNewBlock is called when a new block is received.
func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error {

// check pending txs first
for _, tx := range block.Block.Txs {
Expand All @@ -28,7 +29,7 @@ func (b *Broadcaster) HandleBlock(block *rpccoretypes.ResultBlock, blockResult *

// check if the first pending tx is included in the block
if pendingTx := b.peekLocalPendingTx(); btypes.TxHash(tx) == pendingTx.TxHash {
err := b.MarkPendingTxAsProcessed(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence)
err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence)
if err != nil {
return err
}
Expand All @@ -51,19 +52,7 @@ func (b *Broadcaster) HandleBlock(block *rpccoretypes.ResultBlock, blockResult *
return nil
}

func (b *Broadcaster) MarkPendingTxAsProcessed(blockHeight int64, txHash string, sequence uint64) error {
err := b.deletePendingTx(sequence)
if err != nil {
return err
}

b.logger.Debug("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("txHash", txHash))
b.dequeueLocalPendingTx()

return nil
}

// CheckPendingTx checks if the pending tx is included in the block
// CheckPendingTx query tx info to check if pending tx is processed.
func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, error) {
if b.lenLocalPendingTx() == 0 {
return nil, nil, nil
Expand All @@ -89,6 +78,20 @@ func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.Res
return &pendingTx, res, nil
}

// RemovePendingTx remove pending tx from local pending txs.
// It is called when the pending tx is included in the block.
func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64) error {
err := b.deletePendingTx(sequence)
if err != nil {
return err
}

b.logger.Debug("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("txHash", txHash))
b.dequeueLocalPendingTx()

return nil
}

// Start broadcaster loop
func (b *Broadcaster) Start(ctx context.Context) error {
defer close(b.txChannelStopped)
Expand Down Expand Up @@ -123,10 +126,8 @@ func (b *Broadcaster) Start(ctx context.Context) error {
}

// @dev: these pending processed data is filled at initialization(`NewBroadcaster`).
func (b Broadcaster) BroadcastPendingProcessedMsgs() error {
func (b Broadcaster) BroadcastPendingProcessedMsgs() {
for _, processedMsg := range b.pendingProcessedMsgs {
b.BroadcastMsgs(processedMsg)
}

return nil
}
34 changes: 2 additions & 32 deletions node/broadcaster/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"

sdkerrors "cosmossdk.io/errors"
Expand Down Expand Up @@ -37,36 +35,6 @@ var ignoringErrors = []error{
var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index")

func (b *Broadcaster) txBroadcastLooper(ctx context.Context) error {
retry := time.NewTicker(30 * time.Second)

for {
select {
case <-ctx.Done():
return nil
case data := <-b.txChannel:
count := 1
select {
case <-ctx.Done():
return nil
case <-retry.C:
err := b.handleProcessedMsgs(ctx, data)
if err == nil {
break
} else if err = b.handleMsgError(err); err == nil {
break
}
b.logger.Warn("retry", zap.Int("count", count), zap.String("error", err.Error()))
count++

if count >= 10 {
return errors.Wrap(err, "failed to handle processed msgs")
}
}
}
}
}

func (b *Broadcaster) handleMsgError(err error) error {
if strs := accountSeqRegex.FindStringSubmatch(err.Error()); strs != nil {
expected, parseErr := strconv.ParseUint(strs[1], 10, 64)
Expand Down Expand Up @@ -116,6 +84,8 @@ func (b *Broadcaster) handleMsgError(err error) error {
return err
}

// HandleProcessedMsgs handles processed messages by broadcasting them to the network.
// It stores the transaction in the database and local memory and keep track of the successful broadcast.
func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.ProcessedMsgs) error {
sequence := b.txf.Sequence()
txBytes, txHash, err := b.cfg.BuildTxWithMessages(ctx, data.Msgs)
Expand Down
6 changes: 4 additions & 2 deletions node/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"
)

// blockProcessLooper fetches new blocks and processes them
func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.BlockProcessType) error {
timer := time.NewTicker(nodetypes.POLLING_INTERVAL)
defer timer.Stop()
Expand Down Expand Up @@ -92,6 +93,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo
}
}

// fetch new block from the chain
func (n *Node) fetchNewBlock(ctx context.Context, height int64) (block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, err error) {
n.logger.Debug("fetch new block", zap.Int64("height", height))
block, err = n.rpcClient.Block(ctx, &height)
Expand All @@ -116,7 +118,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc

// handle broadcaster first to check pending txs
if n.broadcaster != nil {
err := n.broadcaster.HandleBlock(block, blockResult, latestChainHeight)
err := n.broadcaster.HandleNewBlock(block, blockResult, latestChainHeight)
if err != nil {
return err
}
Expand Down Expand Up @@ -230,7 +232,7 @@ func (n *Node) txChecker(ctx context.Context) error {
}
}

err = n.broadcaster.MarkPendingTxAsProcessed(int64(res.Height), pendingTx.TxHash, pendingTx.Sequence)
err = n.broadcaster.RemovePendingTx(int64(res.Height), pendingTx.TxHash, pendingTx.Sequence)
if err != nil {
return err
}
Expand Down

0 comments on commit 67daf8e

Please sign in to comment.