From cf2d91ebade134f1de161b65d5c3326e77c6e7b1 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 18 Oct 2023 17:40:44 +0800 Subject: [PATCH 1/8] add option to reannounce local transactions --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 9 +++++++++ core/txpool/txpool.go | 7 ++++--- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3dc1bb9b40..b5ad595a84 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -89,6 +89,7 @@ var ( utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, utils.TxPoolReannounceTimeFlag, + utils.TxPoolReannounceRemotesFlag, utils.SyncModeFlag, utils.SyncTargetFlag, utils.ExitWhenSyncedFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8ae95e25cc..9b557b2843 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -465,6 +465,12 @@ var ( Value: ethconfig.Defaults.TxPool.ReannounceTime, Category: flags.TxPoolCategory, } + TxPoolReannounceRemotesFlag = &cli.BoolFlag{ + Name: "txpool.reannounceremotes", + Usage: "Wether reannnounce remote transactions or not(default = false)", + Value: ethconfig.Defaults.TxPool.ReannounceRemotes, + Category: flags.TxPoolCategory, + } // Performance tuning settings CacheFlag = &cli.IntFlag{ @@ -1663,6 +1669,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) { if ctx.IsSet(TxPoolReannounceTimeFlag.Name) { cfg.ReannounceTime = ctx.Duration(TxPoolReannounceTimeFlag.Name) } + if ctx.IsSet(TxPoolReannounceRemotesFlag.Name) { + cfg.ReannounceRemotes = ctx.Bool(TxPoolReannounceRemotesFlag.Name) + } } func setEthash(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e679a08e..6ec4f63a48 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -183,8 +183,9 @@ type Config struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts - Lifetime time.Duration // Maximum amount of time non-executable transaction are queued - ReannounceTime time.Duration // Duration for announcing local pending transactions again + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + ReannounceTime time.Duration // Duration for announcing local pending transactions again + ReannounceRemotes bool // Wether reannounce remote transactions or not } // DefaultConfig contains the default configurations for the transaction @@ -433,7 +434,7 @@ func (pool *TxPool) loop() { reannoTxs := func() []*types.Transaction { txs := make([]*types.Transaction, 0) for addr, list := range pool.pending { - if !pool.locals.contains(addr) { + if !pool.config.ReannounceRemotes && !pool.locals.contains(addr) { continue } From 9cfc55bf26c8c8f9d0e81bbd280d0b8c6fcb425b Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 28 Nov 2023 18:13:58 +0800 Subject: [PATCH 2/8] add metrics for invalid transactions detail --- core/txpool/invalid.go | 68 ++++++++++++++++++++++++++++++++++ core/txpool/txpool.go | 27 ++++++++++++++ eth/handler.go | 6 +++ eth/protocols/eth/broadcast.go | 4 +- 4 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 core/txpool/invalid.go diff --git a/core/txpool/invalid.go b/core/txpool/invalid.go new file mode 100644 index 0000000000..637919d13c --- /dev/null +++ b/core/txpool/invalid.go @@ -0,0 +1,68 @@ +package txpool + +import ( + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + AlreadyKnown = "AlreadyKnown" + TypeNotSupportDeposit = "TypeNotSupportDeposit" + TypeNotSupport1559 = "TypeNotSupport1559" + TypeNotSupport2718 = "TypeNotSupport2718" + MissingTransaction = "MissingTransaction" + OversizedData = "OversizedData" + MaxInitCodeSizeExceeded = "MaxInitCodeSizeExceeded" + NegativeValue = "NegativeValue" + GasLimit = "GasLimit" + FeeCapVeryHigh = "FeeCapVeryHigh" + TipVeryHigh = "TipVeryHigh" + TipAboveFeeCap = "TipAboveFeeCap" + InvalidSender = "InvalidSender" + Underpriced = "Underpriced" + NonceTooLow = "NonceTooLow" + InsufficientFunds = "InsufficientFunds" + Overdraft = "Overdraft" + IntrinsicGas = "IntrinsicGas" + Throttle = "Throttle" + Overflow = "Overflow" + FutureReplacePending = "FutureReplacePending" + ReplaceUnderpriced = "ReplaceUnderpriced" + QueuedDiscard = "QueueDiscard" + GasUnitOverflow = "GasUnitOverflow" +) + +func meter(err string) metrics.Meter { + return metrics.GetOrRegisterMeter("txpool/invalid/"+err, nil) +} + +func init() { + // init the metrics + for _, err := range []string{ + AlreadyKnown, + TypeNotSupportDeposit, + TypeNotSupport1559, + TypeNotSupport2718, + MissingTransaction, + OversizedData, + MaxInitCodeSizeExceeded, + NegativeValue, + GasLimit, + FeeCapVeryHigh, + TipVeryHigh, + TipAboveFeeCap, + InvalidSender, + Underpriced, + NonceTooLow, + InsufficientFunds, + Overdraft, + IntrinsicGas, + Throttle, + Overflow, + FutureReplacePending, + ReplaceUnderpriced, + QueuedDiscard, + GasUnitOverflow, + } { + meter(err).Mark(0) + } +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e679a08e..376e9805a9 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -652,55 +652,68 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // This is for spam protection, not consensus, // as the external engine-API user authenticates deposits. if tx.Type() == types.DepositTxType { + meter(TypeNotSupportDeposit).Mark(1) return core.ErrTxTypeNotSupported } // Accept only legacy transactions until EIP-2718/2930 activates. if !pool.eip2718 && tx.Type() != types.LegacyTxType { + meter(TypeNotSupport2718).Mark(1) return core.ErrTxTypeNotSupported } // Reject dynamic fee transactions until EIP-1559 activates. if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType { + meter(TypeNotSupport1559).Mark(1) return core.ErrTxTypeNotSupported } // Reject transactions over defined size to prevent DOS attacks if tx.Size() > txMaxSize { + meter(OversizedData).Mark(1) return ErrOversizedData } // Check whether the init code size has been exceeded. if pool.shanghai && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize { + meter(MaxInitCodeSizeExceeded).Mark(1) return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize) } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. if tx.Value().Sign() < 0 { + meter(NegativeValue).Mark(1) return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. if pool.currentMaxGas < tx.Gas() { + meter(GasLimit).Mark(1) return ErrGasLimit } // Sanity check for extremely large numbers if tx.GasFeeCap().BitLen() > 256 { + meter(FeeCapVeryHigh).Mark(1) return core.ErrFeeCapVeryHigh } if tx.GasTipCap().BitLen() > 256 { + meter(TipVeryHigh).Mark(1) return core.ErrTipVeryHigh } // Ensure gasFeeCap is greater than or equal to gasTipCap. if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 { + meter(TipAboveFeeCap).Mark(1) return core.ErrTipAboveFeeCap } // Make sure the transaction is signed properly. from, err := types.Sender(pool.signer, tx) if err != nil { + meter(InvalidSender).Mark(1) return ErrInvalidSender } // Drop non-local transactions under our own minimal accepted gas price or tip if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 { + meter(Underpriced).Mark(1) return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering if pool.currentState.GetNonce(from) > tx.Nonce() { + meter(NonceTooLow).Mark(1) return core.ErrNonceTooLow } // Transactor should have enough funds to cover the costs @@ -711,6 +724,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } balance := pool.currentState.GetBalance(from) if balance.Cmp(cost) < 0 { + meter(InsufficientFunds).Mark(1) return core.ErrInsufficientFunds } @@ -728,6 +742,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } if balance.Cmp(sum) < 0 { log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum) + meter(Overdraft).Mark(1) return ErrOverdraft } } @@ -735,9 +750,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Ensure the transaction has more gas than the basic tx fee. intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul, pool.shanghai) if err != nil { + meter(GasUnitOverflow).Mark(1) return err } if tx.Gas() < intrGas { + meter(IntrinsicGas).Mark(1) return core.ErrIntrinsicGas } return nil @@ -756,6 +773,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) knownTxMeter.Mark(1) + meter(AlreadyKnown).Mark(1) return false, ErrAlreadyKnown } // Make the local flag. If it's from local source or it's from the network but @@ -778,6 +796,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if !isLocal && pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) + meter(Underpriced).Mark(1) return false, ErrUnderpriced } @@ -787,6 +806,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // replacements to 25% of the slots if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) { throttleTxMeter.Mark(1) + meter(Throttle).Mark(1) return false, ErrTxPoolOverflow } @@ -799,6 +819,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if !isLocal && !success { log.Trace("Discarding overflown transaction", "hash", hash) overflowedTxMeter.Mark(1) + meter(Overflow).Mark(1) return false, ErrTxPoolOverflow } @@ -818,6 +839,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e pool.priced.Put(dropTx, false) } log.Trace("Discarding future transaction replacing pending tx", "hash", hash) + meter(FutureReplacePending).Mark(1) return false, ErrFutureReplacePending } } @@ -837,6 +859,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardMeter.Mark(1) + meter(ReplaceUnderpriced).Mark(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one @@ -902,6 +925,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo if !inserted { // An older transaction was better, discard this queuedDiscardMeter.Mark(1) + meter(QueuedDiscard).Mark(1) return false, ErrReplaceUnderpriced } // Discard any previous transaction and mark this @@ -916,6 +940,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo // If the transaction isn't in lookup set but it's expected to be there, // show the error log. if pool.all.Get(hash) == nil && !addAll { + meter(MissingTransaction).Mark(1) log.Error("Missing transaction in lookup set, please report the issue", "hash", hash) } if addAll { @@ -1034,6 +1059,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { if pool.all.Get(tx.Hash()) != nil { errs[i] = ErrAlreadyKnown knownTxMeter.Mark(1) + meter(AlreadyKnown).Mark(1) continue } // Exclude transactions with invalid signatures as soon as @@ -1043,6 +1069,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { if err != nil { errs[i] = ErrInvalidSender invalidTxMeter.Mark(1) + meter(InvalidSender).Mark(1) continue } // Accumulate all unknown transactions for deeper processing diff --git a/eth/handler.go b/eth/handler.go index bcca1ff3b1..5c4709564c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -672,11 +672,17 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { directPeers++ directCount += len(hashes) peer.AsyncSendTransactions(hashes) + log.Trace("Transaction broadcast bodies", "txs", len(hashes), + "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), + ) } for peer, hashes := range annos { annoPeers++ annoCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) + log.Trace("Transaction broadcast hashes", "txs", len(hashes), + "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), + ) } log.Debug("Transaction broadcast", "txs", len(txs), "announce packs", annoPeers, "announced hashes", annoCount, diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index bd07a9ba60..4aff122360 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -98,7 +98,7 @@ func (p *Peer) broadcastTransactions() { return } close(done) - p.Log().Trace("Sent transactions", "count", len(txs)) + p.Log().Trace("Sent transactions bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String()) }) } } @@ -176,7 +176,7 @@ func (p *Peer) announceTransactions() { } } close(done) - p.Log().Trace("Sent transaction announcements", "count", len(pending)) + p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String()) }) } } From adc05230c49f021e6c807e96412bb911e9511d21 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 30 Nov 2023 16:57:56 +0800 Subject: [PATCH 3/8] fix: transaction broadcasting trace logs --- eth/protocols/eth/broadcast.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 4aff122360..fbfc9b5448 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -18,6 +18,7 @@ package eth import ( "math/big" + "strings" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" @@ -61,6 +62,22 @@ func (p *Peer) broadcastBlocks() { } } +func collectHashes(txs []*types.Transaction) []common.Hash { + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + return hashes +} + +func concat(hashes []common.Hash) string { + strslice := make([]string, len(hashes)) + for i, hash := range hashes { + strslice[i] = hash.String() + } + return strings.Join(strslice, ",") +} + // broadcastTransactions is a write loop that schedules transaction broadcasts // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. @@ -98,7 +115,7 @@ func (p *Peer) broadcastTransactions() { return } close(done) - p.Log().Trace("Sent transactions bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String()) + p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String(), "hashes", concat(collectHashes(txs))) }) } } @@ -176,7 +193,7 @@ func (p *Peer) announceTransactions() { } } close(done) - p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String()) + p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String(), "hashes", concat(pending)) }) } } From c63fd7b111a1860d042d8a92ddeba25c7637dabd Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 18 Dec 2023 02:05:52 +0800 Subject: [PATCH 4/8] improve txpool reannounce frequency --- core/txpool/txpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e679a08e..87e10c7011 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -107,7 +107,7 @@ var ( // It is removed from the tx pool max gas to better indicate that L2 transactions // are not able to consume all of the gas in a L2 block as the L1 info deposit is always present. l1InfoGasOverhead = uint64(70_000) - reannounceInterval = time.Minute // Time interval to check for reannounce transactions + reannounceInterval = 10 * time.Second // Time interval to check for reannounce transactions ) var ( From f84bc2bce3cb38469c684577189032de0b3092bc Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 18 Dec 2023 03:15:00 +0800 Subject: [PATCH 5/8] v0.2.1, improve txpool reannounce freq [3] --- core/txpool/txpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index cffd5c9c90..3c9889edfc 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -107,7 +107,7 @@ var ( // It is removed from the tx pool max gas to better indicate that L2 transactions // are not able to consume all of the gas in a L2 block as the L1 info deposit is always present. l1InfoGasOverhead = uint64(70_000) - reannounceInterval = 10 * time.Second // Time interval to check for reannounce transactions + reannounceInterval = 1 * time.Second // Time interval to check for reannounce transactions ) var ( From 866a6831dd58d4266101cc03ddb5af74eca54d23 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 18 Dec 2023 03:32:20 +0800 Subject: [PATCH 6/8] v0.2.1: reannounce all peers improve txpool reannounce freq [4] --- eth/handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eth/handler.go b/eth/handler.go index bcca1ff3b1..28e5f5b0ae 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -692,7 +692,8 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) { } // Announce transactions hash to a batch of peers - peersCount := uint(math.Sqrt(float64(h.peers.len()))) + //peersCount := uint(math.Sqrt(float64(h.peers.len()))) + peersCount := uint(h.peers.len()) peers := h.peers.headPeers(peersCount) for _, peer := range peers { peer.AsyncSendPooledTransactionHashes(hashes) From e86ab2eabdb0070ddf432b7210f5e2a141d628e2 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 18 Dec 2023 05:14:17 +0800 Subject: [PATCH 7/8] reannounce with bodies --- eth/protocols/eth/broadcast.go | 64 ++++++++++++++++++++++++++++++++++ eth/protocols/eth/peer.go | 2 +- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index bd07a9ba60..571573bceb 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -128,6 +128,70 @@ func (p *Peer) broadcastTransactions() { } } +func (p *Peer) announceWithBody() { + var ( + queue []common.Hash // Queue of hashes to broadcast as full transactions + done chan struct{} // Non-nil if background broadcaster is running + fail = make(chan error, 1) // Channel used to receive network error + failed bool // Flag whether a send failed, discard everything onward + ) + for { + // If there's no in-flight broadcast running, check if a new one is needed + if done == nil && len(queue) > 0 { + // Pile transaction until we reach our allowed network limit + var ( + hashesCount uint64 + txs []*types.Transaction + size common.StorageSize + ) + for i := 0; i < len(queue) && size < maxTxPacketSize; i++ { + if tx := p.txpool.Get(queue[i]); tx != nil { + txs = append(txs, tx) + size += common.StorageSize(tx.Size()) + } + hashesCount++ + } + queue = queue[:copy(queue, queue[hashesCount:])] + + // If there's anything available to transfer, fire up an async writer + if len(txs) > 0 { + done = make(chan struct{}) + gopool.Submit(func() { + if err := p.SendTransactions(txs); err != nil { + fail <- err + return + } + close(done) + p.Log().Trace("Sent transactions", "count", len(txs)) + }) + } + } + // Transfer goroutine may or may not have been started, listen for events + select { + case hashes := <-p.txAnnounce: + // If the connection failed, discard all transaction events + if failed { + continue + } + // New batch of transactions to be broadcast, queue them (with cap) + queue = append(queue, hashes...) + if len(queue) > maxQueuedTxs { + // Fancy copy and resize to ensure buffer doesn't grow indefinitely + queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] + } + + case <-done: + done = nil + + case <-fail: + failed = true + + case <-p.term: + return + } + } +} + // announceTransactions is a write loop that schedules transaction broadcasts // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 219f486c8e..b3a3b52230 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -115,7 +115,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe // Start up all the broadcasters go peer.broadcastBlocks() go peer.broadcastTransactions() - go peer.announceTransactions() + go peer.announceWithBody() go peer.dispatcher() return peer From 7c411a386f1f3c3dc446622da597f491a00b68b0 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 21 Dec 2023 09:33:11 +0800 Subject: [PATCH 8/8] add reannounce metric for txpool --- core/txpool/txpool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e679a08e..71d9094f6c 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -111,6 +111,8 @@ var ( ) var ( + staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions + // Metrics for the pending pool pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil) pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil) @@ -451,6 +453,7 @@ func (pool *TxPool) loop() { return txs }() pool.mu.RUnlock() + staledMeter.Mark(int64(len(reannoTxs))) if len(reannoTxs) > 0 { pool.reannoTxFeed.Send(core.ReannoTxsEvent{reannoTxs}) }