Skip to content

Commit 3253118

Browse files
committed
flush queued txs if there is newly added txs from mempool or blocks
1 parent 744563d commit 3253118

File tree

8 files changed

+131
-6
lines changed

8 files changed

+131
-6
lines changed

Diff for: indexer/abci.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,15 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
4444
cumulativeGasUsed := uint64(0)
4545
ethTxs := make([]*coretypes.Transaction, 0, len(req.Txs))
4646
receipts := make([]*coretypes.Receipt, 0, len(req.Txs))
47+
senderNonceMap := make(map[string]uint64)
4748
for idx, txBytes := range req.Txs {
4849
tx, err := e.txConfig.TxDecoder()(txBytes)
4950
if err != nil {
5051
e.logger.Error("failed to decode tx", "err", err)
5152
continue
5253
}
5354

54-
ethTx, _, err := keeper.NewTxUtils(e.evmKeeper).ConvertCosmosTxToEthereumTx(sdkCtx, tx)
55+
ethTx, sender, err := keeper.NewTxUtils(e.evmKeeper).ConvertCosmosTxToEthereumTx(sdkCtx, tx)
5556
if err != nil {
5657
e.logger.Error("failed to convert CosmosTx to EthTx", "err", err)
5758
return err
@@ -82,6 +83,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
8283

8384
txIndex++
8485
ethTxs = append(ethTxs, ethTx)
86+
senderNonceMap[sender.Hex()] = ethTx.Nonce()
8587

8688
// extract logs and contract address from tx results
8789
ethLogs, contractAddr, err := extractLogsAndContractAddr(txStatus, txResult.Data, ethTx.To() == nil)
@@ -201,6 +203,18 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
201203
}
202204
}()
203205

206+
// flush queued txs to mempool
207+
if e.flushQueuedTxs != nil {
208+
go func() {
209+
for senderHex, nonce := range senderNonceMap {
210+
// try to flush queued txs from the next nonce
211+
if err := e.flushQueuedTxs(senderHex, nonce+1); err != nil {
212+
e.logger.Error("failed to flush queued txs", "err", err)
213+
}
214+
}
215+
}()
216+
}
217+
204218
// TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker.
205219
// - https://github.com/cosmos/cosmos-sdk/issues/22246
206220
//

Diff for: indexer/abci_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,38 @@ func Test_ListenFinalizeBlock_Subscribe(t *testing.T) {
112112
done()
113113
}
114114

115+
func Test_ListenFinalizeBlock_FlushQueuedTxs(t *testing.T) {
116+
app, addrs, privKeys := tests.CreateApp(t)
117+
indexer := app.EVMIndexer()
118+
defer app.Close()
119+
120+
received := make(map[string]uint64)
121+
122+
wg := sync.WaitGroup{}
123+
wg.Add(1)
124+
indexer.RegisterFlushQueuedTxs(func(senderHex string, accSeq uint64) error {
125+
received[senderHex] = accSeq
126+
wg.Done()
127+
return nil
128+
})
129+
130+
tx, _ := tests.GenerateCreateERC20Tx(t, app, privKeys[0])
131+
132+
// load current sequence
133+
ctx, err := app.CreateQueryContext(0, false)
134+
require.NoError(t, err)
135+
seq, err := app.AccountKeeper.GetSequence(ctx, addrs[0].Bytes())
136+
require.NoError(t, err)
137+
138+
_, finalizeRes := tests.ExecuteTxs(t, app, tx)
139+
tests.CheckTxResult(t, finalizeRes.TxResults[0], true)
140+
141+
wg.Wait()
142+
143+
require.Len(t, received, 1)
144+
require.Equal(t, uint64(seq+1), received[addrs[0].Hex()])
145+
}
146+
115147
func Test_ListenFinalizeBlock_ContractCreation(t *testing.T) {
116148
app, _, privKeys := tests.CreateApp(t)
117149
indexer := app.EVMIndexer()

Diff for: indexer/indexer.go

+16
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ type EVMIndexer interface {
5454
MempoolWrapper(mempool mempool.Mempool) mempool.Mempool
5555
TxInMempool(hash common.Hash) *rpctypes.RPCTransaction
5656

57+
// register flush queued txs function to
58+
RegisterFlushQueuedTxs(f FlushQueuedTxs)
59+
5760
// bloom
5861
ReadBloomBits(ctx context.Context, section uint64, index uint32) ([]byte, error)
5962
PeekBloomBitsNextSection(ctx context.Context) (uint64, error)
@@ -63,6 +66,9 @@ type EVMIndexer interface {
6366
Stop()
6467
}
6568

69+
// FlushQueuedTxs is a function to flush queued transactions.
70+
type FlushQueuedTxs = func(senderHex string, accSeq uint64) error
71+
6672
// EVMIndexerImpl implements EVMIndexer.
6773
type EVMIndexerImpl struct {
6874
enabled bool
@@ -102,6 +108,9 @@ type EVMIndexerImpl struct {
102108

103109
// txPendingMap is a map to store tx hashes in pending state.
104110
txPendingMap *ttlcache.Cache[common.Hash, *rpctypes.RPCTransaction]
111+
112+
// flushQueuedTxs is a function to flush queued transactions to mempool.
113+
flushQueuedTxs FlushQueuedTxs
105114
}
106115

107116
func NewEVMIndexer(
@@ -163,6 +172,8 @@ func NewEVMIndexer(
163172
// pending tx lifetime is 1 minute in indexer
164173
ttlcache.WithTTL[common.Hash, *rpctypes.RPCTransaction](time.Minute),
165174
),
175+
176+
flushQueuedTxs: nil,
166177
}
167178

168179
schema, err := sb.Build()
@@ -189,6 +200,11 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.
189200
return blockChan, logsChan, pendingChan
190201
}
191202

203+
// RegisterFlushQueuedTxs registers a function to flush queued transactions.
204+
func (e *EVMIndexerImpl) RegisterFlushQueuedTxs(f FlushQueuedTxs) {
205+
e.flushQueuedTxs = f
206+
}
207+
192208
// blockEvents is a struct to emit block events.
193209
type blockEvents struct {
194210
header *coretypes.Header

Diff for: indexer/mempool.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,17 @@ func (m *MempoolWrapper) CountTx() int {
4343
// Insert implements mempool.Mempool.
4444
func (m *MempoolWrapper) Insert(ctx context.Context, tx sdk.Tx) error {
4545
txUtils := evmkeeper.NewTxUtils(m.indexer.evmKeeper)
46-
ethTx, _, err := txUtils.ConvertCosmosTxToEthereumTx(ctx, tx)
46+
ethTx, sender, err := txUtils.ConvertCosmosTxToEthereumTx(ctx, tx)
4747
if err != nil {
4848
m.indexer.logger.Error("failed to convert CosmosTx to EthTx", "err", err)
4949
return err
5050
}
5151

5252
if ethTx != nil {
5353
ethTxHash := ethTx.Hash()
54+
senderHex := sender.Hex()
55+
nonce := ethTx.Nonce()
56+
5457
rpcTx := rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId())
5558

5659
m.indexer.logger.Debug("inserting tx into mempool", "pending len", m.indexer.txPendingMap.Len(), "ethTxHash", ethTxHash)
@@ -62,6 +65,15 @@ func (m *MempoolWrapper) Insert(ctx context.Context, tx sdk.Tx) error {
6265
pendingChan <- rpcTx
6366
}
6467
}()
68+
69+
if m.indexer.flushQueuedTxs != nil {
70+
go func() {
71+
// try to flush queued txs from the next nonce
72+
if err := m.indexer.flushQueuedTxs(senderHex, nonce+1); err != nil {
73+
m.indexer.logger.Error("failed to flush queued txs", "err", err)
74+
}
75+
}()
76+
}
6577
}
6678

6779
return m.mempool.Insert(ctx, tx)

Diff for: indexer/mempool_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,40 @@ func Test_Mempool_Subscribe(t *testing.T) {
5555
done()
5656
}
5757

58+
func Test_Mempool_TriggerFlushQueuedTxs(t *testing.T) {
59+
app, addrs, privKeys := tests.CreateApp(t)
60+
indexer := app.EVMIndexer()
61+
defer app.Close()
62+
63+
received := make(map[string]uint64)
64+
65+
wg := sync.WaitGroup{}
66+
wg.Add(1)
67+
indexer.RegisterFlushQueuedTxs(func(senderHex string, accSeq uint64) error {
68+
received[senderHex] = accSeq
69+
wg.Done()
70+
return nil
71+
})
72+
73+
tx, _ := tests.GenerateCreateERC20Tx(t, app, privKeys[0])
74+
75+
noopMempool := &mempool.NoOpMempool{}
76+
mempool := indexer.MempoolWrapper(noopMempool)
77+
78+
// insert tx into mempool
79+
ctx, err := app.CreateQueryContext(0, false)
80+
require.NoError(t, err)
81+
seq, err := app.AccountKeeper.GetSequence(ctx, addrs[0].Bytes())
82+
require.NoError(t, err)
83+
err = mempool.Insert(ctx, tx)
84+
require.NoError(t, err)
85+
86+
wg.Wait()
87+
88+
require.Len(t, received, 1)
89+
require.Equal(t, uint64(seq+1), received[addrs[0].Hex()])
90+
}
91+
5892
func consumeBlockLogsChan(blockCh <-chan *coretypes.Header, logChan <-chan []*coretypes.Log, duration int) {
5993
timer := time.NewTimer(time.Second * time.Duration(duration))
6094

Diff for: jsonrpc/backend/backend.go

+3
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ func NewJSONRPCBackend(
149149
// Start the bloom bits servicing goroutines
150150
b.startBloomHandlers(evmconfig.SectionSize)
151151

152+
// Register flush queued txs function
153+
b.app.EVMIndexer().RegisterFlushQueuedTxs(b.FlushQueuedTxs)
154+
152155
return b, nil
153156
}
154157

Diff for: jsonrpc/backend/tx.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,6 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
7575

7676
senderHex := common.BytesToAddress(sender.Bytes()).Hex()
7777

78-
// hold mutex for each sender
79-
accMut := b.acquireAccMut(senderHex)
80-
defer b.releaseAccMut(senderHex, accMut)
81-
8278
checkCtx := b.app.GetContextForCheckTx(nil)
8379
if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil {
8480
accSeq = acc.GetSequence()
@@ -93,9 +89,26 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
9389

9490
txHash := tx.Hash()
9591
b.queuedTxHashes.Store(txHash, cacheKey)
92+
93+
// hold mutex for each sender
94+
accMut := b.acquireAccMut(senderHex)
9695
_ = b.queuedTxs.Add(cacheKey, txQueueItem{hash: txHash, bytes: txBytes, body: tx, sender: senderHex})
96+
b.releaseAccMut(senderHex, accMut)
9797

9898
// check if there are queued txs which can be sent
99+
if err = b.FlushQueuedTxs(senderHex, accSeq); err != nil {
100+
return err
101+
}
102+
103+
return nil
104+
}
105+
106+
// flush the queued transactions for the given sender only if the sequence matches
107+
func (b *JSONRPCBackend) FlushQueuedTxs(senderHex string, accSeq uint64) error {
108+
// hold mutex for each sender
109+
accMut := b.acquireAccMut(senderHex)
110+
defer b.releaseAccMut(senderHex, accMut)
111+
99112
for {
100113
cacheKey := fmt.Sprintf("%s-%d", senderHex, accSeq)
101114
if txQueueItem, ok := b.queuedTxs.Get(cacheKey); ok {

Diff for: x/evm/keeper/txutils.go

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (u *TxUtils) ConvertEthereumTxToCosmosTx(ctx context.Context, ethTx *corety
9191
if err != nil {
9292
return nil, err
9393
}
94+
9495
// sig bytes
9596
v, r, s := ethTx.RawSignatureValues()
9697
sigBytes := make([]byte, 65)

0 commit comments

Comments
 (0)