@@ -3,6 +3,7 @@ package broadcaster
3
3
import (
4
4
"encoding/hex"
5
5
"fmt"
6
+ "regexp"
6
7
"slices"
7
8
"sync"
8
9
"time"
@@ -22,6 +23,8 @@ import (
22
23
"github.com/initia-labs/opinit-bots/types"
23
24
)
24
25
26
+ var txNotFoundRegex = regexp .MustCompile ("Internal error: tx ([A-Fa-f0-9]+) not found" )
27
+
25
28
type Broadcaster struct {
26
29
cfg btypes.BroadcasterConfig
27
30
@@ -143,57 +146,42 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
143
146
}
144
147
ctx .Logger ().Debug ("load pending txs" , zap .Int ("count" , len (pendingTxs )))
145
148
146
- if len (pendingTxs ) == 0 {
147
- return nil
148
- }
149
-
150
- pendingTxTime := time .Unix (0 , pendingTxs [0 ].Timestamp ).UTC ()
151
-
152
- // if we have pending txs, wait until timeout
153
- if timeoutTime := pendingTxTime .Add (b .cfg .TxTimeout ); lastBlockTime .Before (timeoutTime ) {
154
- waitingTime := timeoutTime .Sub (lastBlockTime )
155
- timer := time .NewTimer (waitingTime )
156
- defer timer .Stop ()
157
-
158
- ctx .Logger ().Info ("waiting for pending txs to be processed" , zap .Duration ("waiting_time" , waitingTime ))
149
+ pollingTimer := time .NewTicker (ctx .PollingInterval ())
150
+ defer pollingTimer .Stop ()
159
151
160
- pollingTimer := time .NewTicker (ctx .PollingInterval ())
161
- defer pollingTimer .Stop ()
152
+ reProcessingTxs := make ([]btypes.PendingTxInfo , 0 )
162
153
163
- WAITLOOP:
164
- for {
165
- if len (pendingTxs ) == 0 {
166
- return nil
167
- }
168
-
169
- select {
170
- case <- ctx .Done ():
171
- return ctx .Err ()
172
- case <- timer .C :
173
- break WAITLOOP
174
- case <- pollingTimer .C :
175
- }
154
+ for txIndex := 0 ; txIndex < len (pendingTxs ); {
155
+ select {
156
+ case <- ctx .Done ():
157
+ return ctx .Err ()
158
+ case <- pollingTimer .C :
159
+ }
176
160
177
- txHash , err := hex .DecodeString (pendingTxs [0 ].TxHash )
178
- if err != nil {
179
- return err
180
- }
161
+ txHash , err := hex .DecodeString (pendingTxs [txIndex ].TxHash )
162
+ if err != nil {
163
+ return err
164
+ }
181
165
182
- res , err := b .rpcClient .QueryTx (ctx , txHash )
183
- if err == nil && res != nil && res .TxResult .Code == 0 {
184
- ctx .Logger ().Debug ("transaction successfully included" ,
185
- zap .String ("hash" , pendingTxs [0 ].TxHash ),
186
- zap .Int64 ("height" , res .Height ))
187
- err = DeletePendingTx (b .db , pendingTxs [0 ])
188
- if err != nil {
189
- return err
190
- }
191
- pendingTxs = pendingTxs [1 :]
192
- } else if err == nil && res != nil {
193
- ctx .Logger ().Warn ("transaction failed" ,
194
- zap .String ("hash" , pendingTxs [0 ].TxHash ),
195
- zap .Uint32 ("code" , res .TxResult .Code ),
196
- zap .String ("log" , res .TxResult .Log ))
166
+ res , err := b .rpcClient .QueryTx (ctx , txHash )
167
+ if err == nil && res != nil && res .TxResult .Code == 0 {
168
+ ctx .Logger ().Debug ("transaction successfully included" ,
169
+ zap .String ("hash" , pendingTxs [txIndex ].TxHash ),
170
+ zap .Int64 ("height" , res .Height ))
171
+ txIndex ++
172
+ } else if err == nil && res != nil {
173
+ ctx .Logger ().Warn ("transaction failed" ,
174
+ zap .String ("hash" , pendingTxs [txIndex ].TxHash ),
175
+ zap .Uint32 ("code" , res .TxResult .Code ),
176
+ zap .String ("log" , res .TxResult .Log ))
177
+ reProcessingTxs = append (reProcessingTxs , pendingTxs [txIndex ])
178
+ txIndex ++
179
+ } else if err != nil && txNotFoundRegex .FindStringSubmatch (err .Error ()) != nil {
180
+ pendingTxTime := time .Unix (0 , pendingTxs [txIndex ].Timestamp ).UTC ()
181
+ timeoutTime := pendingTxTime .Add (b .cfg .TxTimeout )
182
+ if lastBlockTime .After (timeoutTime ) {
183
+ reProcessingTxs = append (reProcessingTxs , pendingTxs [txIndex :]... )
184
+ break
197
185
}
198
186
}
199
187
}
@@ -203,11 +191,13 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
203
191
return err
204
192
}
205
193
206
- processedMsgsBatch , err := b .pendingTxsToProcessedMsgsBatch (ctx , pendingTxs )
207
- if err != nil {
208
- return err
194
+ if len (reProcessingTxs ) != 0 {
195
+ processedMsgsBatch , err := b .pendingTxsToProcessedMsgsBatch (ctx , reProcessingTxs )
196
+ if err != nil {
197
+ return err
198
+ }
199
+ b .pendingProcessedMsgsBatch = append (b .pendingProcessedMsgsBatch , processedMsgsBatch ... )
209
200
}
210
- b .pendingProcessedMsgsBatch = append (b .pendingProcessedMsgsBatch , processedMsgsBatch ... )
211
201
return nil
212
202
}
213
203
0 commit comments