Skip to content

Commit 5f2e7de

Browse files
authored
Fix/tx checker (#44)
* find tx using txsearch with tx hash * change sequence * query block header instead of block * handle tx failed * remove redunt sequence err
1 parent 4434dfb commit 5f2e7de

File tree

8 files changed

+124
-120
lines changed

8 files changed

+124
-120
lines changed

node/broadcaster/account.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import (
1818

1919
var _ client.AccountRetriever = &Broadcaster{}
2020

21-
func (b *Broadcaster) loadAccount() error {
22-
account, err := b.GetAccount(b.getClientCtx(), b.keyAddress)
21+
func (b *Broadcaster) loadAccount(ctx context.Context) error {
22+
account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress)
2323
if err != nil {
2424
return err
2525
}
@@ -45,15 +45,15 @@ func (b *Broadcaster) GetAccount(clientCtx client.Context, addr sdk.AccAddress)
4545
// GetAccountWithHeight queries for an account given an address. Returns the
4646
// height of the query with the account. An error is returned if the query
4747
// or decoding fails.
48-
func (b *Broadcaster) GetAccountWithHeight(_ client.Context, addr sdk.AccAddress) (client.Account, int64, error) {
48+
func (b *Broadcaster) GetAccountWithHeight(clienCtx client.Context, addr sdk.AccAddress) (client.Account, int64, error) {
4949
var header metadata.MD
5050
address, err := keys.EncodeBech32AccAddr(addr, b.cfg.Bech32Prefix)
5151
if err != nil {
5252
return nil, 0, err
5353
}
5454

5555
queryClient := authtypes.NewQueryClient(b.rpcClient)
56-
res, err := queryClient.Account(context.Background(), &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header))
56+
res, err := queryClient.Account(clienCtx.CmdContext, &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header))
5757
if err != nil {
5858
return nil, 0, err
5959
}

node/broadcaster/broadcaster.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,13 @@ func (b *Broadcaster) Initialize(ctx context.Context, status *rpccoretypes.Resul
122122
return b.prepareBroadcaster(ctx, status.SyncInfo.LatestBlockTime)
123123
}
124124

125-
func (b Broadcaster) getClientCtx() client.Context {
125+
func (b Broadcaster) getClientCtx(ctx context.Context) client.Context {
126126
return client.Context{}.WithClient(b.rpcClient).
127127
WithInterfaceRegistry(b.cdc.InterfaceRegistry()).
128128
WithChainID(b.cfg.ChainID).
129129
WithCodec(b.cdc).
130-
WithFromAddress(b.keyAddress)
130+
WithFromAddress(b.keyAddress).
131+
WithCmdContext(ctx)
131132
}
132133

133134
func (b Broadcaster) GetTxf() tx.Factory {
@@ -144,7 +145,7 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time
144145
WithKeybase(b.keyBase).
145146
WithSignMode(signing.SignMode_SIGN_MODE_DIRECT)
146147

147-
err := b.loadAccount()
148+
err := b.loadAccount(ctx)
148149
if err != nil {
149150
return err
150151
}

node/broadcaster/process.go

Lines changed: 38 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,85 +20,60 @@ func (b Broadcaster) GetHeight() int64 {
2020
return b.lastProcessedBlockHeight + 1
2121
}
2222

23-
// HandleNewBlock is called when a new block is received.
24-
func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight int64) error {
25-
// check pending txs first
26-
for _, tx := range block.Block.Txs {
27-
if b.LenLocalPendingTx() == 0 {
28-
break
23+
// CheckPendingTx query tx info to check if pending tx is processed.
24+
func (b *Broadcaster) CheckPendingTx(ctx context.Context, pendingTx btypes.PendingTxInfo) (*rpccoretypes.ResultTx, time.Time, error) {
25+
txHash, err := hex.DecodeString(pendingTx.TxHash)
26+
if err != nil {
27+
return nil, time.Time{}, err
28+
}
29+
res, txerr := b.rpcClient.QueryTx(ctx, txHash)
30+
if txerr != nil {
31+
// if the tx is not found, it means the tx is not processed yet
32+
// or the tx is not indexed by the node in rare cases.
33+
lastHeader, err := b.rpcClient.Header(ctx, nil)
34+
if err != nil {
35+
return nil, time.Time{}, err
2936
}
30-
31-
// check if the first pending tx is included in the block
32-
if pendingTx := b.peekLocalPendingTx(); btypes.TxHash(tx) == pendingTx.TxHash {
33-
err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes)
37+
pendingTxTime := time.Unix(0, pendingTx.Timestamp)
38+
39+
// before timeout
40+
if lastHeader.Header.Time.Before(pendingTxTime.Add(b.cfg.TxTimeout)) {
41+
b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", txerr.Error()))
42+
return nil, time.Time{}, types.ErrTxNotFound
43+
} else {
44+
// timeout case
45+
account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress)
3446
if err != nil {
35-
return err
47+
return nil, time.Time{}, err
3648
}
37-
}
38-
}
3949

40-
// check timeout of pending txs
41-
// @sh-cha: should we rebroadcast pending txs? or raising monitoring alert?
42-
if length := b.LenLocalPendingTx(); length > 0 {
43-
b.logger.Debug("remaining pending txs", zap.Int64("height", block.Block.Height), zap.Int("count", length))
44-
pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp)
45-
if block.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) {
46-
panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", block.Block.Time.UTC().String(), pendingTxTime.UTC().String()))
50+
// if sequence is larger than the sequence of the pending tx,
51+
// handle it as the tx has already been processed
52+
if pendingTx.Sequence < account.GetSequence() {
53+
return nil, time.Time{}, nil
54+
}
55+
panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String()))
4756
}
57+
} else if res.TxResult.Code != 0 {
58+
panic(fmt.Errorf("tx failed, tx hash: %s, code: %d, log: %s; you might need to check gas adjustment config or balance", pendingTx.TxHash, res.TxResult.Code, res.TxResult.Log))
4859
}
4960

50-
// update last processed block height
51-
b.lastProcessedBlockHeight = latestChainHeight
52-
53-
return nil
54-
}
55-
56-
// CheckPendingTx query tx info to check if pending tx is processed.
57-
func (b *Broadcaster) CheckPendingTx(ctx context.Context) (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, time.Time, error) {
58-
if b.LenLocalPendingTx() == 0 {
59-
return nil, nil, time.Time{}, nil
60-
}
61-
62-
pendingTx := b.peekLocalPendingTx()
63-
pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp)
64-
65-
lastBlockResult, err := b.rpcClient.Block(ctx, nil)
66-
if err != nil {
67-
return nil, nil, time.Time{}, err
68-
}
69-
if lastBlockResult.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) {
70-
// @sh-cha: should we rebroadcast pending txs? or raising monitoring alert?
71-
panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String()))
72-
}
73-
74-
txHash, err := hex.DecodeString(pendingTx.TxHash)
75-
if err != nil {
76-
return nil, nil, time.Time{}, err
77-
}
78-
res, err := b.rpcClient.QueryTx(ctx, txHash)
79-
if err != nil {
80-
b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", err.Error()))
81-
return nil, nil, time.Time{}, nil
82-
}
83-
84-
blockResult, err := b.rpcClient.Block(ctx, &res.Height)
61+
header, err := b.rpcClient.Header(ctx, &res.Height)
8562
if err != nil {
86-
return nil, nil, time.Time{}, err
63+
return nil, time.Time{}, err
8764
}
88-
return &pendingTx, res, blockResult.Block.Time, nil
65+
return res, header.Header.Time, nil
8966
}
9067

9168
// RemovePendingTx remove pending tx from local pending txs.
9269
// It is called when the pending tx is included in the block.
93-
func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64, msgTypes []string) error {
70+
func (b *Broadcaster) RemovePendingTx(sequence uint64) error {
9471
err := b.deletePendingTx(sequence)
9572
if err != nil {
9673
return err
9774
}
9875

99-
b.logger.Info("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("tx_hash", txHash), zap.Strings("msg_types", msgTypes))
10076
b.dequeueLocalPendingTx()
101-
10277
return nil
10378
}
10479

@@ -124,7 +99,9 @@ func (b *Broadcaster) Start(ctx context.Context) error {
12499
break
125100
}
126101
b.logger.Warn(fmt.Sprintf("retry to handle processed msgs after %d seconds", int(2*math.Exp2(float64(retry)))), zap.Int("count", retry), zap.String("error", err.Error()))
127-
types.SleepWithRetry(ctx, retry)
102+
if types.SleepWithRetry(ctx, retry) {
103+
return nil
104+
}
128105
}
129106
if err != nil {
130107
return errors.Wrap(err, "failed to handle processed msgs")

node/broadcaster/tx.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
"go.uber.org/zap"
1212

13+
"github.com/pkg/errors"
14+
1315
sdkerrors "cosmossdk.io/errors"
1416
abci "github.com/cometbft/cometbft/abci/types"
1517

@@ -50,8 +52,6 @@ func (b *Broadcaster) handleMsgError(err error) error {
5052
b.txf = b.txf.WithSequence(expected)
5153
}
5254

53-
// account sequence mismatched
54-
// TODO: handle mismatched sequence
5555
return err
5656
}
5757

@@ -237,11 +237,14 @@ func (b *Broadcaster) enqueueLocalPendingTx(tx btypes.PendingTxInfo) {
237237
b.pendingTxs = append(b.pendingTxs, tx)
238238
}
239239

240-
func (b *Broadcaster) peekLocalPendingTx() btypes.PendingTxInfo {
240+
func (b *Broadcaster) PeekLocalPendingTx() (btypes.PendingTxInfo, error) {
241241
b.pendingTxMu.Lock()
242242
defer b.pendingTxMu.Unlock()
243243

244-
return b.pendingTxs[0]
244+
if len(b.pendingTxs) == 0 {
245+
return btypes.PendingTxInfo{}, errors.New("no pending txs")
246+
}
247+
return b.pendingTxs[0], nil
245248
}
246249

247250
func (b Broadcaster) LenLocalPendingTx() int {

node/node.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -134,23 +134,9 @@ func (n *Node) Start(ctx context.Context) {
134134
n.broadcaster.BroadcastPendingProcessedMsgs()
135135
}
136136

137-
if n.cfg.ProcessType == nodetypes.PROCESS_TYPE_ONLY_BROADCAST {
138-
if n.broadcaster == nil {
139-
panic("broadcaster cannot be nil with nodetypes.PROCESS_TYPE_ONLY_BROADCAST")
140-
}
141-
142-
errGrp.Go(func() (err error) {
143-
defer func() {
144-
n.logger.Info("tx checker looper stopped")
145-
if r := recover(); r != nil {
146-
n.logger.Error("tx checker panic", zap.Any("recover", r))
147-
err = fmt.Errorf("tx checker panic: %v", r)
148-
}
149-
}()
150-
151-
return n.txChecker(ctx)
152-
})
153-
} else {
137+
enableEventHandler := true
138+
if n.cfg.ProcessType != nodetypes.PROCESS_TYPE_ONLY_BROADCAST {
139+
enableEventHandler = false
154140
errGrp.Go(func() (err error) {
155141
defer func() {
156142
n.logger.Info("block process looper stopped")
@@ -163,6 +149,18 @@ func (n *Node) Start(ctx context.Context) {
163149
return n.blockProcessLooper(ctx, n.cfg.ProcessType)
164150
})
165151
}
152+
153+
errGrp.Go(func() (err error) {
154+
defer func() {
155+
n.logger.Info("tx checker looper stopped")
156+
if r := recover(); r != nil {
157+
n.logger.Error("tx checker panic", zap.Any("recover", r))
158+
err = fmt.Errorf("tx checker panic: %v", r)
159+
}
160+
}()
161+
162+
return n.txChecker(ctx, enableEventHandler)
163+
})
166164
}
167165

168166
func (n Node) AccountCodec() address.Codec {

node/process.go

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo
2424
case <-ctx.Done():
2525
return nil
2626
case <-timer.C:
27-
types.SleepWithRetry(ctx, consecutiveErrors)
27+
if types.SleepWithRetry(ctx, consecutiveErrors) {
28+
return nil
29+
}
2830
consecutiveErrors++
2931
}
3032

@@ -130,14 +132,6 @@ func (n *Node) handleNewBlock(ctx context.Context, block *rpccoretypes.ResultBlo
130132
return err
131133
}
132134

133-
// handle broadcaster first to check pending txs
134-
if n.broadcaster != nil {
135-
err := n.broadcaster.HandleNewBlock(block, blockResult, latestChainHeight)
136-
if err != nil {
137-
return err
138-
}
139-
}
140-
141135
if n.beginBlockHandler != nil {
142136
err := n.beginBlockHandler(ctx, nodetypes.BeginBlockArgs{
143137
BlockID: block.BlockID.Hash,
@@ -212,7 +206,10 @@ func (n *Node) handleEvent(ctx context.Context, blockHeight int64, blockTime tim
212206
}
213207

214208
// txChecker checks pending txs and handle events if the tx is included in the block
215-
func (n *Node) txChecker(ctx context.Context) error {
209+
// in the case that the tx hash is not indexed by the node even if the tx is processed,
210+
// event handler will not be called.
211+
// so, it is recommended to use the event handler only for the check event (e.g. logs)
212+
func (n *Node) txChecker(ctx context.Context, enableEventHandler bool) error {
216213
if n.broadcaster == nil {
217214
return nil
218215
}
@@ -225,39 +222,64 @@ func (n *Node) txChecker(ctx context.Context) error {
225222
case <-ctx.Done():
226223
return nil
227224
case <-timer.C:
228-
types.SleepWithRetry(ctx, consecutiveErrors)
225+
if n.broadcaster.LenLocalPendingTx() == 0 {
226+
continue
227+
}
228+
229+
n.logger.Debug("remaining pending txs", zap.Int("count", n.broadcaster.LenLocalPendingTx()))
230+
231+
if types.SleepWithRetry(ctx, consecutiveErrors) {
232+
return nil
233+
}
229234
consecutiveErrors++
230235
}
231236

232-
pendingTx, res, blockTime, err := n.broadcaster.CheckPendingTx(ctx)
237+
pendingTx, err := n.broadcaster.PeekLocalPendingTx()
233238
if err != nil {
234239
return err
235-
} else if pendingTx == nil || res == nil {
236-
// tx not found
237-
continue
238240
}
239241

240-
if len(n.eventHandlers) != 0 {
241-
events := res.TxResult.GetEvents()
242-
for eventIndex, event := range events {
243-
select {
244-
case <-ctx.Done():
245-
return nil
246-
default:
247-
}
242+
height := int64(0)
248243

249-
err := n.handleEvent(ctx, res.Height, blockTime, 0, event)
250-
if err != nil {
251-
n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error()))
252-
break
244+
res, blockTime, err := n.broadcaster.CheckPendingTx(ctx, pendingTx)
245+
if errors.Is(err, types.ErrTxNotFound) {
246+
// tx not found
247+
continue
248+
} else if err != nil {
249+
return err
250+
} else if res != nil {
251+
// tx found
252+
height = res.Height
253+
// it only handles the tx if node is only broadcasting txs, not processing blocks
254+
if enableEventHandler && len(n.eventHandlers) != 0 {
255+
events := res.TxResult.GetEvents()
256+
for eventIndex, event := range events {
257+
select {
258+
case <-ctx.Done():
259+
return nil
260+
default:
261+
}
262+
263+
err := n.handleEvent(ctx, res.Height, blockTime, 0, event)
264+
if err != nil {
265+
n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error()))
266+
break
267+
}
253268
}
254269
}
255270
}
256271

257-
err = n.broadcaster.RemovePendingTx(res.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes)
272+
err = n.broadcaster.RemovePendingTx(pendingTx.Sequence)
258273
if err != nil {
259274
return err
260275
}
276+
n.logger.Info("tx inserted",
277+
zap.Int64("height", height),
278+
zap.Uint64("sequence", pendingTx.Sequence),
279+
zap.String("tx_hash", pendingTx.TxHash),
280+
zap.Strings("msg_types", pendingTx.MsgTypes),
281+
zap.Int("pending_txs", n.broadcaster.LenLocalPendingTx()),
282+
)
261283
consecutiveErrors = 0
262284
}
263285
}

types/errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ package types
33
import "errors"
44

55
var ErrKeyNotSet = errors.New("key not set")
6+
var ErrAccountSequenceMismatch = errors.New("account sequence mismatch")
7+
var ErrTxNotFound = errors.New("tx not found")

0 commit comments

Comments
 (0)