Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve txpool reannounce frequency #40

1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.TxPoolReannounceRemotesFlag,
utils.SyncModeFlag,
utils.SyncTargetFlag,
utils.ExitWhenSyncedFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ var (
Value: ethconfig.Defaults.TxPool.ReannounceTime,
Category: flags.TxPoolCategory,
}
TxPoolReannounceRemotesFlag = &cli.BoolFlag{
Name: "txpool.reannounceremotes",
Usage: "Wether reannnounce remote transactions or not(default = false)",
Value: ethconfig.Defaults.TxPool.ReannounceRemotes,
Category: flags.TxPoolCategory,
}

// Performance tuning settings
CacheFlag = &cli.IntFlag{
Expand Down Expand Up @@ -1663,6 +1669,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolReannounceTimeFlag.Name) {
cfg.ReannounceTime = ctx.Duration(TxPoolReannounceTimeFlag.Name)
}
if ctx.IsSet(TxPoolReannounceRemotesFlag.Name) {
cfg.ReannounceRemotes = ctx.Bool(TxPoolReannounceRemotesFlag.Name)
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
68 changes: 68 additions & 0 deletions core/txpool/invalid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package txpool

import (
"github.com/ethereum/go-ethereum/metrics"
)

const (
AlreadyKnown = "AlreadyKnown"
TypeNotSupportDeposit = "TypeNotSupportDeposit"
TypeNotSupport1559 = "TypeNotSupport1559"
TypeNotSupport2718 = "TypeNotSupport2718"
MissingTransaction = "MissingTransaction"
OversizedData = "OversizedData"
MaxInitCodeSizeExceeded = "MaxInitCodeSizeExceeded"
NegativeValue = "NegativeValue"
GasLimit = "GasLimit"
FeeCapVeryHigh = "FeeCapVeryHigh"
TipVeryHigh = "TipVeryHigh"
TipAboveFeeCap = "TipAboveFeeCap"
InvalidSender = "InvalidSender"
Underpriced = "Underpriced"
NonceTooLow = "NonceTooLow"
InsufficientFunds = "InsufficientFunds"
Overdraft = "Overdraft"
IntrinsicGas = "IntrinsicGas"
Throttle = "Throttle"
Overflow = "Overflow"
FutureReplacePending = "FutureReplacePending"
ReplaceUnderpriced = "ReplaceUnderpriced"
QueuedDiscard = "QueueDiscard"
GasUnitOverflow = "GasUnitOverflow"
)

func meter(err string) metrics.Meter {
return metrics.GetOrRegisterMeter("txpool/invalid/"+err, nil)
}

func init() {
// init the metrics
for _, err := range []string{
AlreadyKnown,
TypeNotSupportDeposit,
TypeNotSupport1559,
TypeNotSupport2718,
MissingTransaction,
OversizedData,
MaxInitCodeSizeExceeded,
NegativeValue,
GasLimit,
FeeCapVeryHigh,
TipVeryHigh,
TipAboveFeeCap,
InvalidSender,
Underpriced,
NonceTooLow,
InsufficientFunds,
Overdraft,
IntrinsicGas,
Throttle,
Overflow,
FutureReplacePending,
ReplaceUnderpriced,
QueuedDiscard,
GasUnitOverflow,
} {
meter(err).Mark(0)
}
}
39 changes: 35 additions & 4 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ var (
// It is removed from the tx pool max gas to better indicate that L2 transactions
// are not able to consume all of the gas in a L2 block as the L1 info deposit is always present.
l1InfoGasOverhead = uint64(70_000)
reannounceInterval = time.Minute // Time interval to check for reannounce transactions
reannounceInterval = 1 * time.Second // Time interval to check for reannounce transactions
)

var (
staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions

// Metrics for the pending pool
pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
Expand Down Expand Up @@ -183,8 +185,9 @@ type Config struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
ReannounceRemotes bool // Wether reannounce remote transactions or not
}

// DefaultConfig contains the default configurations for the transaction
Expand Down Expand Up @@ -433,7 +436,7 @@ func (pool *TxPool) loop() {
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
if !pool.config.ReannounceRemotes && !pool.locals.contains(addr) {
continue
}

Expand All @@ -451,6 +454,7 @@ func (pool *TxPool) loop() {
return txs
}()
pool.mu.RUnlock()
staledMeter.Mark(int64(len(reannoTxs)))
if len(reannoTxs) > 0 {
pool.reannoTxFeed.Send(core.ReannoTxsEvent{reannoTxs})
}
Expand Down Expand Up @@ -652,55 +656,68 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// This is for spam protection, not consensus,
// as the external engine-API user authenticates deposits.
if tx.Type() == types.DepositTxType {
meter(TypeNotSupportDeposit).Mark(1)
return core.ErrTxTypeNotSupported
}
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
meter(TypeNotSupport2718).Mark(1)
return core.ErrTxTypeNotSupported
}
// Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
meter(TypeNotSupport1559).Mark(1)
return core.ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if tx.Size() > txMaxSize {
meter(OversizedData).Mark(1)
return ErrOversizedData
}
// Check whether the init code size has been exceeded.
if pool.shanghai && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
meter(MaxInitCodeSizeExceeded).Mark(1)
return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
meter(NegativeValue).Mark(1)
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
meter(GasLimit).Mark(1)
return ErrGasLimit
}
// Sanity check for extremely large numbers
if tx.GasFeeCap().BitLen() > 256 {
meter(FeeCapVeryHigh).Mark(1)
return core.ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
meter(TipVeryHigh).Mark(1)
return core.ErrTipVeryHigh
}
// Ensure gasFeeCap is greater than or equal to gasTipCap.
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
meter(TipAboveFeeCap).Mark(1)
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
meter(InvalidSender).Mark(1)
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
meter(Underpriced).Mark(1)
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
meter(NonceTooLow).Mark(1)
return core.ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
Expand All @@ -711,6 +728,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
balance := pool.currentState.GetBalance(from)
if balance.Cmp(cost) < 0 {
meter(InsufficientFunds).Mark(1)
return core.ErrInsufficientFunds
}

Expand All @@ -728,16 +746,19 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
if balance.Cmp(sum) < 0 {
log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum)
meter(Overdraft).Mark(1)
return ErrOverdraft
}
}

// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul, pool.shanghai)
if err != nil {
meter(GasUnitOverflow).Mark(1)
return err
}
if tx.Gas() < intrGas {
meter(IntrinsicGas).Mark(1)
return core.ErrIntrinsicGas
}
return nil
Expand All @@ -756,6 +777,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
knownTxMeter.Mark(1)
meter(AlreadyKnown).Mark(1)
return false, ErrAlreadyKnown
}
// Make the local flag. If it's from local source or it's from the network but
Expand All @@ -778,6 +800,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
meter(Underpriced).Mark(1)
return false, ErrUnderpriced
}

Expand All @@ -787,6 +810,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// replacements to 25% of the slots
if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
throttleTxMeter.Mark(1)
meter(Throttle).Mark(1)
return false, ErrTxPoolOverflow
}

Expand All @@ -799,6 +823,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if !isLocal && !success {
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
meter(Overflow).Mark(1)
return false, ErrTxPoolOverflow
}

Expand All @@ -818,6 +843,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.priced.Put(dropTx, false)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
meter(FutureReplacePending).Mark(1)
return false, ErrFutureReplacePending
}
}
Expand All @@ -837,6 +863,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardMeter.Mark(1)
meter(ReplaceUnderpriced).Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
Expand Down Expand Up @@ -902,6 +929,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
meter(QueuedDiscard).Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
Expand All @@ -916,6 +944,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
if pool.all.Get(hash) == nil && !addAll {
meter(MissingTransaction).Mark(1)
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
Expand Down Expand Up @@ -1034,6 +1063,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
if pool.all.Get(tx.Hash()) != nil {
errs[i] = ErrAlreadyKnown
knownTxMeter.Mark(1)
meter(AlreadyKnown).Mark(1)
continue
}
// Exclude transactions with invalid signatures as soon as
Expand All @@ -1043,6 +1073,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
meter(InvalidSender).Mark(1)
continue
}
// Accumulate all unknown transactions for deeper processing
Expand Down
9 changes: 8 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,17 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
log.Trace("Transaction broadcast bodies", "txs", len(hashes),
"peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(),
)
}
for peer, hashes := range annos {
annoPeers++
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
log.Trace("Transaction broadcast hashes", "txs", len(hashes),
"peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(),
)
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
Expand All @@ -692,7 +698,8 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) {
}

// Announce transactions hash to a batch of peers
peersCount := uint(math.Sqrt(float64(h.peers.len())))
//peersCount := uint(math.Sqrt(float64(h.peers.len())))
peersCount := uint(h.peers.len())
peers := h.peers.headPeers(peersCount)
for _, peer := range peers {
peer.AsyncSendPooledTransactionHashes(hashes)
Expand Down
Loading
Loading