Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/txpool: make transaction validation reusable across packages (pools) #27429

Merged
merged 4 commits into from Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/geth/main.go
Expand Up @@ -419,7 +419,7 @@ func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend, isCon
}
// Set the gas price to the limits from the CLI and start mining
gasprice := flags.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
ethBackend.TxPool().SetGasPrice(gasprice)
ethBackend.TxPool().SetGasTip(gasprice)
if err := ethBackend.StartMining(); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
Expand Down
180 changes: 55 additions & 125 deletions core/txpool/txpool.go
Expand Up @@ -18,7 +18,6 @@ package txpool

import (
"errors"
"fmt"
"math"
"math/big"
"sort"
Expand Down Expand Up @@ -91,10 +90,6 @@ var (
// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")

// ErrOverdraft is returned if a transaction would cause the senders balance to go negative
// thus invalidating a potential large number of transactions.
ErrOverdraft = errors.New("transaction would cause overdraft")
)

var (
Expand Down Expand Up @@ -178,8 +173,7 @@ type Config struct {
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

// DefaultConfig contains the default configurations for the transaction
// pool.
// DefaultConfig contains the default configurations for the transaction pool.
var DefaultConfig = Config{
Journal: "transactions.rlp",
Rejournal: time.Hour,
Expand Down Expand Up @@ -245,20 +239,15 @@ type TxPool struct {
config Config
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul atomic.Bool // Fork indicator whether we are in the istanbul stage.
eip2718 atomic.Bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 atomic.Bool // Fork indicator whether we are using EIP-1559 type transactions.
shanghai atomic.Bool // Fork indicator whether we are in the Shanghai stage.

currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces
currentMaxGas atomic.Uint64 // Current gas limit for transaction caps
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
Expand Down Expand Up @@ -286,9 +275,9 @@ type txpoolResetRequest struct {
oldHead, newHead *types.Header
}

// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
func New(config Config, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

Expand All @@ -309,8 +298,8 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.gasTip.Store(new(big.Int).SetUint64(config.PriceLimit))
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
Expand Down Expand Up @@ -443,33 +432,25 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subsc
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
defer pool.mu.RUnlock()

return new(big.Int).Set(pool.gasPrice)
}

// SetGasPrice updates the minimum price required by the transaction pool for a
// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *TxPool) SetGasPrice(price *big.Int) {
func (pool *TxPool) SetGasTip(tip *big.Int) {
pool.mu.Lock()
defer pool.mu.Unlock()

old := pool.gasPrice
pool.gasPrice = price
// if the min miner fee increased, remove transactions below the new threshold
if price.Cmp(old) > 0 {
old := pool.gasTip.Load()
pool.gasTip.Store(new(big.Int).Set(tip))

// If the min miner fee increased, remove transactions below the new threshold
if tip.Cmp(old) > 0 {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(price)
drop := pool.all.RemotesBelowTip(tip)
for _, tx := range drop {
pool.removeTx(tx.Hash(), false)
}
pool.priced.Removed(len(drop))
}

log.Info("Transaction pool price threshold updated", "price", price)
log.Info("Transaction pool tip threshold updated", "tip", tip)
}

// Nonce returns the next nonce of an account, with all transactions executable
Expand Down Expand Up @@ -556,7 +537,7 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
// If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(pool.gasPrice, pool.priced.urgent.baseFee) < 0 {
if tx.EffectiveGasTipIntCmp(pool.gasTip.Load(), pool.priced.urgent.baseFee) < 0 {
txs = txs[:i]
break
}
Expand Down Expand Up @@ -598,93 +579,48 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (pool *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718.Load() && tx.Type() != types.LegacyTxType {
return core.ErrTxTypeNotSupported
}
// Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559.Load() && tx.Type() == types.DynamicFeeTxType {
return core.ErrTxTypeNotSupported
}
// Reject blob transactions forever, those will have their own pool.
if tx.Type() == types.BlobTxType {
return core.ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if tx.Size() > txMaxSize {
return ErrOversizedData
}
// Check whether the init code size has been exceeded.
if pool.shanghai.Load() && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas.Load() < tx.Gas() {
return ErrGasLimit
}
// Sanity check for extremely large numbers
if tx.GasFeeCap().BitLen() > 256 {
return core.ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
return core.ErrTipVeryHigh
opts := &ValidationOptions{
Config: pool.chainconfig,
Accept: 0 |
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType,
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load(),
}
// Ensure gasFeeCap is greater than or equal to gasTipCap.
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
if _, err := types.Sender(pool.signer, tx); err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced
if local {
opts.MinTip = new(big.Int)
}
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul.Load(), pool.shanghai.Load())
if err != nil {
if err := ValidateTransaction(tx, nil, nil, nil, pool.currentHead.Load(), pool.signer, opts); err != nil {
return err
}
if tx.Gas() < intrGas {
return core.ErrIntrinsicGas
}
return nil
}

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Signature has been checked already, this cannot error.
from, _ := types.Sender(pool.signer, tx)
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return core.ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
balance := pool.currentState.GetBalance(from)
if balance.Cmp(tx.Cost()) < 0 {
return core.ErrInsufficientFunds
}

// Verify that replacing transactions will not result in overdraft
list := pool.pending[from]
if list != nil { // Sender already has pending txs
sum := new(big.Int).Add(tx.Cost(), list.totalcost)
if repl := list.txs.Get(tx.Nonce()); repl != nil {
// Deduct the cost of a transaction replaced by this
sum.Sub(sum, repl.Cost())
}
if balance.Cmp(sum) < 0 {
log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum)
return ErrOverdraft
}
opts := &ValidationOptionsWithState{
State: pool.currentState,

FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
ExistingExpenditure: func(addr common.Address) *big.Int {
if list := pool.pending[addr]; list != nil {
return list.totalcost
}
return new(big.Int)
},
ExistingCost: func(addr common.Address, nonce uint64) *big.Int {
if list := pool.pending[addr]; list != nil {
if tx := list.txs.Get(nonce); tx != nil {
return tx.Cost()
}
}
return nil
},
}
if err := ValidateTransactionWithState(tx, pool.signer, opts); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -993,7 +929,6 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Exclude transactions with basic errors, e.g invalid signatures and
// insufficient intrinsic gas as soon as possible and cache senders
// in transactions before obtaining lock

if err := pool.validateTxBasics(tx, local); err != nil {
errs[i] = err
invalidTxMeter.Mark(1)
Expand Down Expand Up @@ -1383,21 +1318,14 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentHead.Store(newHead)
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
pool.currentMaxGas.Store(newHead.GasLimit)

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)

// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul.Store(pool.chainconfig.IsIstanbul(next))
pool.eip2718.Store(pool.chainconfig.IsBerlin(next))
pool.eip1559.Store(pool.chainconfig.IsLondon(next))
pool.shanghai.Store(pool.chainconfig.IsShanghai(next, uint64(time.Now().Unix())))
}

// promoteExecutables moves transactions that have become processable from the
Expand All @@ -1408,6 +1336,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
var promoted []*types.Transaction

// Iterate over all accounts and promote any executable transactions
gasLimit := pool.currentHead.Load().GasLimit
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
Expand All @@ -1421,7 +1350,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
Expand Down Expand Up @@ -1607,6 +1536,7 @@ func (pool *TxPool) truncateQueue() {
// to trigger a re-heap is this function
func (pool *TxPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
gasLimit := pool.currentHead.Load().GasLimit
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)

Expand All @@ -1618,7 +1548,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
Expand Down
8 changes: 4 additions & 4 deletions core/txpool/txpool2_test.go
Expand Up @@ -83,7 +83,7 @@ func TestTransactionFutureAttack(t *testing.T) {
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := NewTxPool(config, eip1559Config, blockchain)
pool := New(config, eip1559Config, blockchain)
defer pool.Stop()
fillPool(t, pool)
pending, _ := pool.Stats()
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestTransactionFuture1559(t *testing.T) {
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
pool := New(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()

// Create a number of test accounts, fund them and make transactions
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestTransactionZAttack(t *testing.T) {
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
pool := New(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
Expand Down Expand Up @@ -218,7 +218,7 @@ func BenchmarkFutureAttack(b *testing.B) {
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := NewTxPool(config, eip1559Config, blockchain)
pool := New(config, eip1559Config, blockchain)
defer pool.Stop()
fillPool(b, pool)

Expand Down