diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 20d8e8f8834..53117db2524 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -337,7 +337,6 @@ func (mem *CListMempool) addTx(memTx *mempoolTx, isOracleTx bool) { e := mem.txs.PushBack(memTx) mem.txsMap.Store(txKey, e) - mem.txsMap.Store(memTx.tx.Key(), e) atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) } @@ -347,6 +346,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx, isOracleTx bool) { // - resCbRecheck (lock not held) if tx was invalidated func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { txKey := tx.Key() + mem.txs.Remove(elem) elem.DetachPrev() diff --git a/mempool/v0/clist_mempool_test.go b/mempool/v0/clist_mempool_test.go index 6454c146b9f..d3a89e207e6 100644 --- a/mempool/v0/clist_mempool_test.go +++ b/mempool/v0/clist_mempool_test.go @@ -130,141 +130,6 @@ func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types. return txs } -func TestOraclePriorityResp(t *testing.T) { - app := oracle.NewApplication(true) - cc := proxy.NewLocalClientCreator(app) - - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - appConnCon, _ := cc.NewABCIClient() - appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - err := appConnCon.Start() - require.Nil(t, err) - - cacheMap := make(map[string]struct{}) - deliverTxsRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - - // This will succeed - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mp.CheckTx(txBytes, nil, mempool.TxInfo{}) - _, cached := cacheMap[string(txBytes)] - if cached { - require.NotNil(t, err, "expected error for cached tx") - } else { - require.Nil(t, err, "expected no err for uncached tx") - } - cacheMap[string(txBytes)] = struct{}{} - - // Duplicates are cached and should return error - err = mp.CheckTx(txBytes, nil, mempool.TxInfo{}) - require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") - } - } - - reapCheck := func(exp int) { - txs := mp.ReapMaxBytesMaxGas(-1, -1) - require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs))) - - oddFound := false - for _, tx := range txs { - if binary.BigEndian.Uint64(tx)%2 != 0 { - oddFound = true - } else if oddFound { - require.Fail(t, "oracle txs must be comes first") - } - } - } - - oracleTxsCheck := func(exp int) { - require.Equal(t, mp.oracleTxs.Len(), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, mp.oracleTxs.Len())) - } - - updateRange := func(start, end int) { - txs := make([]types.Tx, 0) - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - txs = append(txs, txBytes) - } - if err := mp.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - } - - commitRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes}) - if err != nil { - t.Errorf("client error committing tx: %v", err) - } - if res.IsErr() { - t.Errorf("error committing tx. Code:%v result:%X log:%v", - res.Code, res.Data, res.Log) - } - } - res, err := appConnCon.CommitSync() - if err != nil { - t.Errorf("client error committing: %v", err) - } - if len(res.Data) != 8 { - t.Errorf("error committing. Hash:%X", res.Data) - } - } - - //---------------------------------------- - - // Deliver some txs. - deliverTxsRange(0, 100) - - // Reap the txs. - reapCheck(100) - - // Oracle txs must be 50 - oracleTxsCheck(50) - - // Reap again. We should get the same amount - reapCheck(100) - - // Deliver 0 to 999, we should reap 900 new txs - // because 100 were already counted. - deliverTxsRange(0, 1000) - - // Reap the txs. - reapCheck(1000) - - // Oracle txs must be 500 - oracleTxsCheck(500) - - // Reap again. We should get the same amount - reapCheck(1000) - - // Commit from the conensus AppConn - commitRange(0, 500) - updateRange(0, 500) - - // We should have 500 left. - reapCheck(500) - - // Oracle txs must be 250 - oracleTxsCheck(250) - - // Deliver 100 invalid txs and 100 valid txs - deliverTxsRange(900, 1100) - - // We should have 600 now. - reapCheck(600) - - // Oracle txs must be 250 - oracleTxsCheck(300) -} - func TestReapMaxBytesMaxGas(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) @@ -537,6 +402,140 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mp.TxsAvailable(), timeoutMS) } +func TestOraclePriorityResp(t *testing.T) { + app := oracle.NewApplication(true) + cc := proxy.NewLocalClientCreator(app) + + mp, cleanup := newMempoolWithApp(cc) + defer cleanup() + + appConnCon, _ := cc.NewABCIClient() + appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) + err := appConnCon.Start() + require.Nil(t, err) + + cacheMap := make(map[string]struct{}) + deliverTxsRange := func(start, end int) { + // Deliver some txs. + for i := start; i < end; i++ { + + // This will succeed + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + err := mp.CheckTx(txBytes, nil, mempool.TxInfo{}) + _, cached := cacheMap[string(txBytes)] + if cached { + require.NotNil(t, err, "expected error for cached tx") + } else { + require.Nil(t, err, "expected no err for uncached tx") + } + cacheMap[string(txBytes)] = struct{}{} + + // Duplicates are cached and should return error + err = mp.CheckTx(txBytes, nil, mempool.TxInfo{}) + require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") + } + } + + reapCheck := func(exp int) { + txs := mp.ReapMaxBytesMaxGas(-1, -1) + require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs))) + + oddFound := false + for _, tx := range txs { + if binary.BigEndian.Uint64(tx)%2 != 0 { + oddFound = true + } else if oddFound { + require.Fail(t, "oracle txs must be comes first") + } + } + } + + oracleTxsCheck := func(exp int) { + require.Equal(t, mp.oracleTxs.Len(), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, mp.oracleTxs.Len())) + } + + updateRange := func(start, end int) { + txs := make([]types.Tx, 0) + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + txs = append(txs, txBytes) + } + if err := mp.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { + t.Error(err) + } + } + + commitRange := func(start, end int) { + // Deliver some txs. + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes}) + if err != nil { + t.Errorf("client error committing tx: %v", err) + } + if res.IsErr() { + t.Errorf("error committing tx. Code:%v result:%X log:%v", + res.Code, res.Data, res.Log) + } + } + res, err := appConnCon.CommitSync() + if err != nil { + t.Errorf("client error committing: %v", err) + } + if len(res.Data) != 8 { + t.Errorf("error committing. Hash:%X", res.Data) + } + } + + //---------------------------------------- + + // Deliver some txs. + deliverTxsRange(0, 100) + + // Reap the txs. + reapCheck(100) + + // Oracle txs must be 50 + oracleTxsCheck(50) + + // Reap again. We should get the same amount + reapCheck(100) + + // Deliver 0 to 999, we should reap 900 new txs + // because 100 were already counted. + deliverTxsRange(0, 1000) + + // Reap the txs. + reapCheck(1000) + + // Oracle txs must be 500 + oracleTxsCheck(500) + + // Reap again. We should get the same amount + reapCheck(1000) + + // Commit from the conensus AppConn + commitRange(0, 500) + updateRange(0, 500) + + // We should have 500 left. + reapCheck(500) + + // Oracle txs must be 250 + oracleTxsCheck(250) + + // Deliver 100 invalid txs and 100 valid txs + deliverTxsRange(900, 1100) + + // We should have 600 now. + reapCheck(600) + + // Oracle txs must be 250 + oracleTxsCheck(300) +} func TestSerialReap(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app)